diff --git a/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql b/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql new file mode 100644 index 0000000..248d2cf --- /dev/null +++ b/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable: Add componentId column to PipelineMetric for per-component latency +ALTER TABLE "PipelineMetric" ADD COLUMN "componentId" TEXT; + +-- CreateIndex: Compound index for per-component latency queries +CREATE INDEX "PipelineMetric_pipelineId_componentId_timestamp_idx" ON "PipelineMetric"("pipelineId", "componentId", "timestamp"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index fb9734d..abbff76 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -327,6 +327,7 @@ model PipelineMetric { pipelineId String pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade) nodeId String? + componentId String? // null = aggregate row, non-null = per-component timestamp DateTime eventsIn BigInt @default(0) eventsOut BigInt @default(0) @@ -335,9 +336,10 @@ model PipelineMetric { bytesIn BigInt @default(0) bytesOut BigInt @default(0) utilization Float @default(0) - latencyMeanMs Float? // pipeline-level weighted mean latency (ms) + latencyMeanMs Float? // pipeline-level weighted mean latency (ms) @@index([pipelineId, timestamp]) + @@index([pipelineId, componentId, timestamp]) @@index([timestamp]) } diff --git a/src/app/(dashboard)/page.tsx b/src/app/(dashboard)/page.tsx index a9b58a2..341064a 100644 --- a/src/app/(dashboard)/page.tsx +++ b/src/app/(dashboard)/page.tsx @@ -382,7 +382,7 @@ export default function DashboardPage() { height={200} /> } data={chartData.data?.pipeline.latency ?? {}} variant="area" diff --git a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx index 5836a1a..c574c47 100644 --- a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx +++ b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useState } from "react"; +import { useState, useMemo } from "react"; import { useParams } from "next/navigation"; import { useQuery } from "@tanstack/react-query"; import { useTRPC } from "@/trpc/client"; @@ -10,6 +10,9 @@ import { Skeleton } from "@/components/ui/skeleton"; import { SummaryCards } from "@/components/metrics/summary-cards"; import { MetricsChart } from "@/components/metrics/component-chart"; import { PipelineLogs } from "@/components/pipeline/pipeline-logs"; +import { LineChart, Line, XAxis, YAxis, CartesianGrid } from "recharts"; +import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart"; +import { formatLatency } from "@/lib/format"; const TIME_RANGES = [ { label: "5m", minutes: 5 }, @@ -35,6 +38,13 @@ export default function PipelineMetricsPage() { ), ); + const componentLatencyQuery = useQuery( + trpc.metrics.getComponentLatencyHistory.queryOptions( + { pipelineId: params.id, minutes }, + { refetchInterval: 15000 }, + ), + ); + if (pipelineQuery.isLoading) { return (
@@ -118,10 +128,13 @@ export default function PipelineMetricsPage() { - Pipeline Latency + Transform Latency - + @@ -141,3 +154,103 @@ export default function PipelineMetricsPage() {
); } + +// Deterministic color palette for per-component latency lines +const LATENCY_COLORS = [ + "#ec4899", "#8b5cf6", "#3b82f6", "#06b6d4", "#10b981", + "#f59e0b", "#ef4444", "#6366f1", "#14b8a6", "#f97316", +]; + +function TransformLatencyChart({ + components, + height = 220, +}: { + components: Record>; + height?: number; +}) { + const { data, config, componentIds } = useMemo(() => { + const ids = Object.keys(components).sort(); + if (ids.length === 0) return { data: [], config: {} as ChartConfig, componentIds: [] }; + + // Collect all unique timestamps (keyed by epoch ms for correct sorting) + const timeMap = new Map>(); + for (const id of ids) { + for (const point of components[id]) { + const ms = new Date(point.timestamp).getTime(); + const entry = timeMap.get(ms) ?? {}; + entry[id] = point.latencyMeanMs; + timeMap.set(ms, entry); + } + } + + // Build chart data sorted by timestamp, display as locale time + const chartData = Array.from(timeMap.entries()) + .sort(([a], [b]) => a - b) + .map(([ms, values]) => ({ + time: new Date(ms).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }), + ...values, + })); + + // Build chart config with deterministic colors + const chartConfig: ChartConfig = {}; + for (let i = 0; i < ids.length; i++) { + chartConfig[ids[i]] = { + label: ids[i], + color: LATENCY_COLORS[i % LATENCY_COLORS.length], + }; + } + + return { data: chartData, config: chartConfig, componentIds: ids }; + }, [components]); + + if (data.length === 0) { + return ( +
+

No transform latency data yet

+
+ ); + } + + return ( + + + + + formatLatency(v)} + /> + ( +
+ {String(name)} + + {formatLatency(Number(value) ?? 0)} + +
+ )} + /> + } + /> + {componentIds.map((id) => ( + + ))} +
+
+ ); +} diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 7508e5d..f26a76b 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -343,6 +343,10 @@ export async function POST(request: Request) { .catch((err) => console.error("Node metrics insert error:", err)); } + // Shared minute-truncated timestamp for all metric rows this heartbeat + const minuteTimestamp = new Date(); + minuteTimestamp.setSeconds(0, 0); + // Ingest metrics from pipelines that report counter data const metricsData = pipelines .filter((p) => p.eventsIn !== undefined) @@ -365,6 +369,55 @@ export async function POST(request: Request) { ); } + // Write per-component latency rows (direct create, bypasses delta-tracking) + const componentLatencyRows: Array<{ + pipelineId: string; + nodeId: string; + componentId: string; + timestamp: Date; + latencyMeanMs: number; + }> = []; + + for (const ps of pipelines) { + if (!Array.isArray(ps.componentMetrics)) continue; + for (const cm of ps.componentMetrics) { + if (cm.latencyMeanSeconds != null && cm.latencyMeanSeconds > 0) { + componentLatencyRows.push({ + pipelineId: ps.pipelineId, + nodeId: agent.nodeId, + componentId: cm.componentId, + timestamp: minuteTimestamp, + latencyMeanMs: cm.latencyMeanSeconds * 1000, + }); + } + } + } + + if (componentLatencyRows.length > 0) { + try { + for (const row of componentLatencyRows) { + const existing = await prisma.pipelineMetric.findFirst({ + where: { + pipelineId: row.pipelineId, + nodeId: row.nodeId, + componentId: row.componentId, + timestamp: row.timestamp, + }, + }); + if (existing) { + await prisma.pipelineMetric.update({ + where: { id: existing.id }, + data: { latencyMeanMs: row.latencyMeanMs }, + }); + } else { + await prisma.pipelineMetric.create({ data: row }); + } + } + } catch (err) { + console.error("Per-component latency upsert error:", err); + } + } + // Feed per-component metrics into the in-memory MetricStore for editor overlays for (const ps of pipelines) { if (Array.isArray(ps.componentMetrics) && ps.componentMetrics.length > 0) { diff --git a/src/components/pipeline/metrics-chart.tsx b/src/components/pipeline/metrics-chart.tsx index b35a8cf..24f653f 100644 --- a/src/components/pipeline/metrics-chart.tsx +++ b/src/components/pipeline/metrics-chart.tsx @@ -168,7 +168,7 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics {/* Latency chart */}
-

Pipeline Latency

+

Transform Latency

diff --git a/src/server/routers/dashboard.ts b/src/server/routers/dashboard.ts index dd189f0..a71407b 100644 --- a/src/server/routers/dashboard.ts +++ b/src/server/routers/dashboard.ts @@ -27,6 +27,7 @@ export const dashboardRouter = router({ prisma.pipelineMetric.aggregate({ where: { nodeId: null, // aggregate rows only + componentId: null, timestamp: { gte: oneHourAgo }, pipeline: { environmentId: input.environmentId }, }, @@ -279,6 +280,7 @@ export const dashboardRouter = router({ where: { pipelineId: { in: pipelineIds }, nodeId: null, + componentId: null, timestamp: { gte: oneHourAgo }, }, orderBy: { timestamp: "asc" }, @@ -470,6 +472,7 @@ export const dashboardRouter = router({ prisma.pipelineMetric.aggregate({ where: { nodeId: null, + componentId: null, timestamp: { gte: fiveMinAgo }, ...(user?.isSuperAdmin ? {} : { pipeline: teamFilter }), }, @@ -503,6 +506,7 @@ export const dashboardRouter = router({ const current = await prisma.pipelineMetric.aggregate({ where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: since }, }, _sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true }, @@ -512,6 +516,7 @@ export const dashboardRouter = router({ const previous = await prisma.pipelineMetric.aggregate({ where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: prevSince, lt: since }, }, _sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true }, @@ -522,6 +527,7 @@ export const dashboardRouter = router({ by: ["pipelineId"], where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: since }, }, _sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true }, @@ -554,6 +560,7 @@ export const dashboardRouter = router({ const rawMetrics = await prisma.pipelineMetric.findMany({ where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: since }, }, select: { @@ -666,6 +673,7 @@ export const dashboardRouter = router({ prisma.pipelineMetric.findMany({ where: { pipelineId: { in: effectivePipelineIds }, + componentId: null, timestamp: { gte: since }, ...(input.groupBy === "node" ? { nodeId: { in: effectiveNodeIds } } diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts index 75583ef..7ccbc06 100644 --- a/src/server/routers/metrics.ts +++ b/src/server/routers/metrics.ts @@ -22,6 +22,7 @@ export const metricsRouter = router({ where: { pipelineId: input.pipelineId, nodeId: null, + componentId: null, timestamp: { gte: since }, }, orderBy: { timestamp: "asc" }, @@ -41,6 +42,62 @@ export const metricsRouter = router({ return { rows }; }), + /** + * Per-component historical latency from the database. + * Used by the pipeline metrics page for the multi-line transform latency chart. + */ + getComponentLatencyHistory: protectedProcedure + .input( + z.object({ + pipelineId: z.string(), + minutes: z.number().int().min(1).max(1440).default(60), + }), + ) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + const since = new Date(Date.now() - input.minutes * 60 * 1000); + + const rows = await prisma.pipelineMetric.findMany({ + where: { + pipelineId: input.pipelineId, + componentId: { not: null }, + timestamp: { gte: since }, + }, + orderBy: { timestamp: "asc" }, + select: { + componentId: true, + timestamp: true, + latencyMeanMs: true, + }, + }); + + // Average across nodes per (componentId, timestamp) to handle multi-node deployments + const components: Record> = {}; + const acc: Record> = {}; + + for (const row of rows) { + if (!row.componentId || row.latencyMeanMs == null) continue; + const tsMs = row.timestamp.getTime(); + const byTs = acc[row.componentId] ?? new Map(); + const bucket = byTs.get(tsMs) ?? { sum: 0, count: 0 }; + bucket.sum += row.latencyMeanMs; + bucket.count++; + byTs.set(tsMs, bucket); + acc[row.componentId] = byTs; + } + + for (const [cid, byTs] of Object.entries(acc)) { + components[cid] = Array.from(byTs.entries()) + .map(([tsMs, { sum, count }]) => ({ + timestamp: new Date(tsMs), + latencyMeanMs: sum / count, + })) + .sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime()); + } + + return { components }; + }), + /** * Per-component live metrics from the in-memory store. * Used by the flow editor to overlay throughput on nodes. diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index b645d6f..3db25a3 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -1063,6 +1063,7 @@ export const pipelineRouter = router({ where: { pipelineId: input.pipelineId, nodeId: null, + componentId: null, timestamp: { gte: since }, }, orderBy: { timestamp: "asc" }, diff --git a/src/server/services/metrics-ingest.ts b/src/server/services/metrics-ingest.ts index 3920f99..e4c55ec 100644 --- a/src/server/services/metrics-ingest.ts +++ b/src/server/services/metrics-ingest.ts @@ -81,6 +81,7 @@ export async function ingestMetrics( where: { pipelineId: dp.pipelineId, nodeId: dp.nodeId, + componentId: null, timestamp: now, }, }); @@ -126,6 +127,7 @@ export async function ingestMetrics( where: { pipelineId, nodeId: { not: null }, + componentId: null, timestamp: now, }, }); @@ -162,6 +164,7 @@ export async function ingestMetrics( where: { pipelineId, nodeId: null, + componentId: null, timestamp: now, }, }); diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts index 4a72c0a..60f3a9f 100644 --- a/src/server/services/sli-evaluator.ts +++ b/src/server/services/sli-evaluator.ts @@ -27,7 +27,7 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ // Use aggregate to avoid transferring all metric rows to the application const agg = await prisma.pipelineMetric.aggregate({ - where: { pipelineId, timestamp: { gte: since } }, + where: { pipelineId, componentId: null, timestamp: { gte: since } }, _sum: { eventsIn: true, errorsTotal: true, eventsDiscarded: true }, _count: true, }); @@ -76,7 +76,7 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ } case "latency_mean": { const latencyAgg = await prisma.pipelineMetric.aggregate({ - where: { pipelineId, timestamp: { gte: since }, latencyMeanMs: { not: null } }, + where: { pipelineId, componentId: null, timestamp: { gte: since }, latencyMeanMs: { not: null } }, _avg: { latencyMeanMs: true }, _count: true, });