Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable: Add componentId column to PipelineMetric for per-component latency
ALTER TABLE "PipelineMetric" ADD COLUMN "componentId" TEXT;

-- CreateIndex: Compound index for per-component latency queries
CREATE INDEX "PipelineMetric_pipelineId_componentId_timestamp_idx" ON "PipelineMetric"("pipelineId", "componentId", "timestamp");
4 changes: 3 additions & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ model PipelineMetric {
pipelineId String
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
nodeId String?
componentId String? // null = aggregate row, non-null = per-component
timestamp DateTime
eventsIn BigInt @default(0)
eventsOut BigInt @default(0)
Expand All @@ -335,9 +336,10 @@ model PipelineMetric {
bytesIn BigInt @default(0)
bytesOut BigInt @default(0)
utilization Float @default(0)
latencyMeanMs Float? // pipeline-level weighted mean latency (ms)
latencyMeanMs Float? // pipeline-level weighted mean latency (ms)

@@index([pipelineId, timestamp])
@@index([pipelineId, componentId, timestamp])
@@index([timestamp])
}

Expand Down
2 changes: 1 addition & 1 deletion src/app/(dashboard)/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ export default function DashboardPage() {
height={200}
/>
<MetricChart
title="Pipeline Latency"
title="Transform Latency"
icon={<Timer className="h-4 w-4" />}
data={chartData.data?.pipeline.latency ?? {}}
variant="area"
Expand Down
119 changes: 116 additions & 3 deletions src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use client";

import { useState } from "react";
import { useState, useMemo } from "react";
import { useParams } from "next/navigation";
import { useQuery } from "@tanstack/react-query";
import { useTRPC } from "@/trpc/client";
Expand All @@ -10,6 +10,9 @@ import { Skeleton } from "@/components/ui/skeleton";
import { SummaryCards } from "@/components/metrics/summary-cards";
import { MetricsChart } from "@/components/metrics/component-chart";
import { PipelineLogs } from "@/components/pipeline/pipeline-logs";
import { LineChart, Line, XAxis, YAxis, CartesianGrid } from "recharts";
import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart";
import { formatLatency } from "@/lib/format";

const TIME_RANGES = [
{ label: "5m", minutes: 5 },
Expand All @@ -35,6 +38,13 @@ export default function PipelineMetricsPage() {
),
);

const componentLatencyQuery = useQuery(
trpc.metrics.getComponentLatencyHistory.queryOptions(
{ pipelineId: params.id, minutes },
{ refetchInterval: 15000 },
),
);

if (pipelineQuery.isLoading) {
return (
<div className="space-y-6">
Expand Down Expand Up @@ -118,10 +128,13 @@ export default function PipelineMetricsPage() {

<Card>
<CardHeader>
<CardTitle>Pipeline Latency</CardTitle>
<CardTitle>Transform Latency</CardTitle>
</CardHeader>
<CardContent>
<MetricsChart rows={rows} dataKey="latency" height={220} />
<TransformLatencyChart
components={componentLatencyQuery.data?.components ?? {}}
height={220}
/>
</CardContent>
</Card>
</>
Expand All @@ -141,3 +154,103 @@ export default function PipelineMetricsPage() {
</div>
);
}

// Deterministic color palette for per-component latency lines
const LATENCY_COLORS = [
"#ec4899", "#8b5cf6", "#3b82f6", "#06b6d4", "#10b981",
"#f59e0b", "#ef4444", "#6366f1", "#14b8a6", "#f97316",
];

function TransformLatencyChart({
components,
height = 220,
}: {
components: Record<string, Array<{ timestamp: Date; latencyMeanMs: number }>>;
height?: number;
}) {
const { data, config, componentIds } = useMemo(() => {
const ids = Object.keys(components).sort();
if (ids.length === 0) return { data: [], config: {} as ChartConfig, componentIds: [] };

// Collect all unique timestamps (keyed by epoch ms for correct sorting)
const timeMap = new Map<number, Record<string, number>>();
for (const id of ids) {
for (const point of components[id]) {
const ms = new Date(point.timestamp).getTime();
const entry = timeMap.get(ms) ?? {};
entry[id] = point.latencyMeanMs;
timeMap.set(ms, entry);
}
}

// Build chart data sorted by timestamp, display as locale time
const chartData = Array.from(timeMap.entries())
.sort(([a], [b]) => a - b)
.map(([ms, values]) => ({
time: new Date(ms).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }),
...values,
}));

// Build chart config with deterministic colors
const chartConfig: ChartConfig = {};
for (let i = 0; i < ids.length; i++) {
chartConfig[ids[i]] = {
label: ids[i],
color: LATENCY_COLORS[i % LATENCY_COLORS.length],
};
}

return { data: chartData, config: chartConfig, componentIds: ids };
}, [components]);

if (data.length === 0) {
return (
<div
className="flex flex-col items-center justify-center text-muted-foreground"
style={{ height }}
>
<p className="text-sm">No transform latency data yet</p>
</div>
);
}

return (
<ChartContainer config={config} className="w-full" style={{ height }}>
<LineChart data={data}>
<CartesianGrid strokeDasharray="3 3" className="stroke-muted" />
<XAxis dataKey="time" tick={{ fontSize: 10 }} interval="preserveStartEnd" />
<YAxis
tick={{ fontSize: 10 }}
width={55}
tickFormatter={(v) => formatLatency(v)}
/>
<ChartTooltip
content={
<ChartTooltipContent
formatter={(value, name) => (
<div className="flex w-full items-center justify-between gap-2">
<span className="text-muted-foreground">{String(name)}</span>
<span className="font-mono font-medium text-foreground">
{formatLatency(Number(value) ?? 0)}
</span>
</div>
)}
/>
}
/>
{componentIds.map((id) => (
<Line
key={id}
type="monotone"
dataKey={id}
name={id}
stroke={config[id]?.color ?? "#888"}
strokeWidth={1.5}
dot={false}
connectNulls
/>
))}
</LineChart>
</ChartContainer>
);
}
53 changes: 53 additions & 0 deletions src/app/api/agent/heartbeat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ export async function POST(request: Request) {
.catch((err) => console.error("Node metrics insert error:", err));
}

// Shared minute-truncated timestamp for all metric rows this heartbeat
const minuteTimestamp = new Date();
minuteTimestamp.setSeconds(0, 0);

// Ingest metrics from pipelines that report counter data
const metricsData = pipelines
.filter((p) => p.eventsIn !== undefined)
Expand All @@ -365,6 +369,55 @@ export async function POST(request: Request) {
);
}

// Write per-component latency rows (direct create, bypasses delta-tracking)
const componentLatencyRows: Array<{
pipelineId: string;
nodeId: string;
componentId: string;
timestamp: Date;
latencyMeanMs: number;
}> = [];

for (const ps of pipelines) {
if (!Array.isArray(ps.componentMetrics)) continue;
for (const cm of ps.componentMetrics) {
if (cm.latencyMeanSeconds != null && cm.latencyMeanSeconds > 0) {
componentLatencyRows.push({
pipelineId: ps.pipelineId,
nodeId: agent.nodeId,
componentId: cm.componentId,
timestamp: minuteTimestamp,
latencyMeanMs: cm.latencyMeanSeconds * 1000,
});
}
}
}

if (componentLatencyRows.length > 0) {
try {
for (const row of componentLatencyRows) {
const existing = await prisma.pipelineMetric.findFirst({
where: {
pipelineId: row.pipelineId,
nodeId: row.nodeId,
componentId: row.componentId,
timestamp: row.timestamp,
},
});
if (existing) {
await prisma.pipelineMetric.update({
where: { id: existing.id },
data: { latencyMeanMs: row.latencyMeanMs },
});
} else {
await prisma.pipelineMetric.create({ data: row });
}
}
} catch (err) {
console.error("Per-component latency upsert error:", err);
}
}

// Feed per-component metrics into the in-memory MetricStore for editor overlays
for (const ps of pipelines) {
if (Array.isArray(ps.componentMetrics) && ps.componentMetrics.length > 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/components/pipeline/metrics-chart.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics

{/* Latency chart */}
<div>
<p className="text-xs text-muted-foreground mb-1 font-medium">Pipeline Latency</p>
<p className="text-xs text-muted-foreground mb-1 font-medium">Transform Latency</p>
<ChartContainer config={latencyChartConfig} className="w-full" style={{ height: 180 }}>
<AreaChart data={data}>
<CartesianGrid strokeDasharray="3 3" className="stroke-muted" />
Expand Down
8 changes: 8 additions & 0 deletions src/server/routers/dashboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const dashboardRouter = router({
prisma.pipelineMetric.aggregate({
where: {
nodeId: null, // aggregate rows only
componentId: null,
timestamp: { gte: oneHourAgo },
pipeline: { environmentId: input.environmentId },
},
Expand Down Expand Up @@ -279,6 +280,7 @@ export const dashboardRouter = router({
where: {
pipelineId: { in: pipelineIds },
nodeId: null,
componentId: null,
timestamp: { gte: oneHourAgo },
},
orderBy: { timestamp: "asc" },
Expand Down Expand Up @@ -470,6 +472,7 @@ export const dashboardRouter = router({
prisma.pipelineMetric.aggregate({
where: {
nodeId: null,
componentId: null,
timestamp: { gte: fiveMinAgo },
...(user?.isSuperAdmin ? {} : { pipeline: teamFilter }),
},
Expand Down Expand Up @@ -503,6 +506,7 @@ export const dashboardRouter = router({
const current = await prisma.pipelineMetric.aggregate({
where: {
pipeline: { environmentId: input.environmentId },
componentId: null,
timestamp: { gte: since },
},
_sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true },
Expand All @@ -512,6 +516,7 @@ export const dashboardRouter = router({
const previous = await prisma.pipelineMetric.aggregate({
where: {
pipeline: { environmentId: input.environmentId },
componentId: null,
timestamp: { gte: prevSince, lt: since },
},
_sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true },
Expand All @@ -522,6 +527,7 @@ export const dashboardRouter = router({
by: ["pipelineId"],
where: {
pipeline: { environmentId: input.environmentId },
componentId: null,
timestamp: { gte: since },
},
_sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true },
Expand Down Expand Up @@ -554,6 +560,7 @@ export const dashboardRouter = router({
const rawMetrics = await prisma.pipelineMetric.findMany({
where: {
pipeline: { environmentId: input.environmentId },
componentId: null,
timestamp: { gte: since },
},
select: {
Expand Down Expand Up @@ -666,6 +673,7 @@ export const dashboardRouter = router({
prisma.pipelineMetric.findMany({
where: {
pipelineId: { in: effectivePipelineIds },
componentId: null,
timestamp: { gte: since },
...(input.groupBy === "node"
? { nodeId: { in: effectiveNodeIds } }
Expand Down
57 changes: 57 additions & 0 deletions src/server/routers/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const metricsRouter = router({
where: {
pipelineId: input.pipelineId,
nodeId: null,
componentId: null,
timestamp: { gte: since },
},
orderBy: { timestamp: "asc" },
Expand All @@ -41,6 +42,62 @@ export const metricsRouter = router({
return { rows };
}),

/**
* Per-component historical latency from the database.
* Used by the pipeline metrics page for the multi-line transform latency chart.
*/
getComponentLatencyHistory: protectedProcedure
.input(
z.object({
pipelineId: z.string(),
minutes: z.number().int().min(1).max(1440).default(60),
}),
)
.use(withTeamAccess("VIEWER"))
.query(async ({ input }) => {
const since = new Date(Date.now() - input.minutes * 60 * 1000);

const rows = await prisma.pipelineMetric.findMany({
where: {
pipelineId: input.pipelineId,
componentId: { not: null },
timestamp: { gte: since },
},
orderBy: { timestamp: "asc" },
select: {
componentId: true,
timestamp: true,
latencyMeanMs: true,
},
});

// Average across nodes per (componentId, timestamp) to handle multi-node deployments
const components: Record<string, Array<{ timestamp: Date; latencyMeanMs: number }>> = {};
const acc: Record<string, Map<number, { sum: number; count: number }>> = {};

for (const row of rows) {
if (!row.componentId || row.latencyMeanMs == null) continue;
const tsMs = row.timestamp.getTime();
const byTs = acc[row.componentId] ?? new Map();
const bucket = byTs.get(tsMs) ?? { sum: 0, count: 0 };
bucket.sum += row.latencyMeanMs;
bucket.count++;
byTs.set(tsMs, bucket);
acc[row.componentId] = byTs;
}

for (const [cid, byTs] of Object.entries(acc)) {
components[cid] = Array.from(byTs.entries())
.map(([tsMs, { sum, count }]) => ({
timestamp: new Date(tsMs),
latencyMeanMs: sum / count,
}))
.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());
}

return { components };
}),

/**
* Per-component live metrics from the in-memory store.
* Used by the flow editor to overlay throughput on nodes.
Expand Down
Loading
Loading