diff --git a/prisma/migrations/20260311020000_add_node_status_event/migration.sql b/prisma/migrations/20260311020000_add_node_status_event/migration.sql new file mode 100644 index 00000000..a71f6c7d --- /dev/null +++ b/prisma/migrations/20260311020000_add_node_status_event/migration.sql @@ -0,0 +1,20 @@ +-- CreateTable +CREATE TABLE "NodeStatusEvent" ( + "id" TEXT NOT NULL, + "nodeId" TEXT NOT NULL, + "fromStatus" TEXT, + "toStatus" TEXT NOT NULL, + "reason" TEXT, + "timestamp" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "NodeStatusEvent_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "NodeStatusEvent_nodeId_timestamp_idx" ON "NodeStatusEvent"("nodeId", "timestamp"); + +-- CreateIndex +CREATE INDEX "NodeStatusEvent_timestamp_idx" ON "NodeStatusEvent"("timestamp"); + +-- AddForeignKey +ALTER TABLE "NodeStatusEvent" ADD CONSTRAINT "NodeStatusEvent_nodeId_fkey" FOREIGN KEY ("nodeId") REFERENCES "VectorNode"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index dc1b8d9c..fb9734d3 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -173,6 +173,7 @@ model VectorNode { nodeMetrics NodeMetric[] pipelineLogs PipelineLog[] alertEvents AlertEvent[] + statusEvents NodeStatusEvent[] createdAt DateTime @default(now()) } @@ -201,6 +202,19 @@ model NodeMetric { @@index([timestamp]) } +model NodeStatusEvent { + id String @id @default(cuid()) + nodeId String + node VectorNode @relation(fields: [nodeId], references: [id], onDelete: Cascade) + fromStatus String? // null on first event (node enrolled) + toStatus String // HEALTHY, DEGRADED, UNREACHABLE + reason String? // e.g. "heartbeat timeout", "heartbeat received", "enrolled" + timestamp DateTime @default(now()) + + @@index([nodeId, timestamp]) + @@index([timestamp]) +} + enum NodeStatus { HEALTHY DEGRADED diff --git a/src/app/(dashboard)/fleet/[nodeId]/page.tsx b/src/app/(dashboard)/fleet/[nodeId]/page.tsx index c3a2fbfd..fd67adbc 100644 --- a/src/app/(dashboard)/fleet/[nodeId]/page.tsx +++ b/src/app/(dashboard)/fleet/[nodeId]/page.tsx @@ -4,7 +4,7 @@ import { useParams, useRouter } from "next/navigation"; import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; import { useTRPC } from "@/trpc/client"; import Link from "next/link"; -import { ShieldOff, Trash2, Activity, Terminal, Server, Pencil, Check, X, Wrench, Plus, Tag } from "lucide-react"; +import { ShieldOff, Trash2, Activity, Pencil, Check, X, Wrench, Plus, Tag } from "lucide-react"; import { NodeLogs } from "@/components/fleet/node-logs"; import { toast } from "sonner"; import { useState } from "react"; @@ -23,8 +23,11 @@ import { TableRow, } from "@/components/ui/table"; import { Skeleton } from "@/components/ui/skeleton"; -import { Separator } from "@/components/ui/separator"; +import { Tabs, TabsList, TabsTrigger, TabsContent } from "@/components/ui/tabs"; import { NodeMetricsCharts } from "@/components/fleet/node-metrics-charts"; +import { UptimeCards } from "@/components/fleet/uptime-cards"; +import { StatusTimeline } from "@/components/fleet/status-timeline"; +import { EventLog } from "@/components/fleet/event-log"; import { formatTimestamp as formatLastSeen, formatCount, @@ -62,6 +65,7 @@ export default function NodeDetailPage() { const [editName, setEditName] = useState(""); const [isEditingLabels, setIsEditingLabels] = useState(false); const [editLabels, setEditLabels] = useState>([]); + const [timelineRange, setTimelineRange] = useState<"1h" | "6h" | "1d" | "7d" | "30d">("1d"); const nodeQuery = useQuery( trpc.fleet.get.queryOptions( @@ -341,266 +345,284 @@ export default function NodeDetailPage() { )} -
- {/* Node Details */} - - - Node Details - - -
-
-

Status

- - {nodeStatusLabel(node.status)} - -
-
-

Environment

-

{node.environment.name}

-
-
-

Agent Version

-

{node.agentVersion ?? "\u2014"}

-
-
-

Vector Version

-

{node.vectorVersion ?? "Unknown"}

-
-
-

Last Heartbeat

-

{formatLastSeen(node.lastHeartbeat)}

-
-
-

Enrolled

-

{node.enrolledAt ? formatLastSeen(node.enrolledAt) : "Not enrolled"}

-
-
-

Host

-

{node.host}

-
-
-

API Port

-

{node.apiPort}

-
-
-

Last Seen

-

{formatLastSeen(node.lastSeen)}

-
-
-

Created

-

- {new Date(node.createdAt).toLocaleDateString()} -

-
-
-
-
- - {/* Node Labels */} - - - - - - Labels - - {!isEditingLabels && ( - - )} - - - - {isEditingLabels ? ( -
- {editLabels.map((label, idx) => ( -
- { - const next = [...editLabels]; - next[idx] = { ...next[idx], key: e.target.value }; - setEditLabels(next); - }} - className="flex-1" - /> - = - { - const next = [...editLabels]; - next[idx] = { ...next[idx], value: e.target.value }; - setEditLabels(next); - }} - className="flex-1" - /> + + + Overview + Health + Metrics + Logs + + + +
+ {/* Node Details */} + + + Node Details + + +
+
+

Status

+
+ + {nodeStatusLabel(node.status)} + + {node.currentStatusSince && ( + + for {formatLastSeen(node.currentStatusSince)} + + )} +
+
+
+

Environment

+

{node.environment.name}

+
+
+

Agent Version

+

{node.agentVersion ?? "\u2014"}

+
+
+

Vector Version

+

{node.vectorVersion ?? "Unknown"}

+
+
+

Last Heartbeat

+

{formatLastSeen(node.lastHeartbeat)}

+
+
+

Enrolled

+

{node.enrolledAt ? formatLastSeen(node.enrolledAt) : "Not enrolled"}

+
+
+

Host

+

{node.host}

+
+
+

API Port

+

{node.apiPort}

+
+
+

Last Seen

+

{formatLastSeen(node.lastSeen)}

+
+
+

Created

+

+ {new Date(node.createdAt).toLocaleDateString()} +

+
+
+
+
+ + {/* Node Labels */} + + + + + + Labels + + {!isEditingLabels && ( + + )} + + + + {isEditingLabels ? ( +
+ {editLabels.map((label, idx) => ( +
+ { + const next = [...editLabels]; + next[idx] = { ...next[idx], key: e.target.value }; + setEditLabels(next); + }} + className="flex-1" + /> + = + { + const next = [...editLabels]; + next[idx] = { ...next[idx], value: e.target.value }; + setEditLabels(next); + }} + className="flex-1" + /> + +
+ ))} +
+ + +
- ))} - -
- - -
-
- ) : ( -
- {Object.entries((node.labels as Record) ?? {}).length > 0 ? ( - Object.entries((node.labels as Record) ?? {}).map( - ([k, v]) => ( - - {k}={v} - - ), - ) ) : ( -

No labels assigned

+
+ {Object.entries((node.labels as Record) ?? {}).length > 0 ? ( + Object.entries((node.labels as Record) ?? {}).map( + ([k, v]) => ( + + {k}={v} + + ), + ) + ) : ( +

No labels assigned

+ )} +
)} -
- )} - - - -
- - - - {/* System Resources */} -
-

- - System Resources -

- -
- - - - {/* Pipeline Metrics */} - - - - - Pipeline Metrics - - - - {node.pipelineStatuses && node.pipelineStatuses.length > 0 ? ( - - - - Pipeline - Status - Events In - Events Out - Errors - Bytes In - Bytes Out - Avg Latency - Uptime - - - - {node.pipelineStatuses.map((ps) => { - const rates = pipelineRates[ps.pipelineId]; - return ( - - - {ps.pipeline?.name ?? ps.pipelineId.slice(0, 8)} - - - - {pipelineStatusLabel(ps.status)} - - - -
{formatCount(ps.eventsIn)}
- {rates &&
{formatRate(rates.eventsInRate)}
} -
- -
{formatCount(ps.eventsOut)}
- {rates &&
{formatRate(rates.eventsOutRate)}
} -
- -
{formatCount(ps.errorsTotal)}
- {rates && rates.errorsRate > 0 &&
{formatRate(rates.errorsRate)}
} -
- -
{formatBytes(ps.bytesIn)}
- {rates &&
{formatBytesRate(rates.bytesInRate)}
} -
- -
{formatBytes(ps.bytesOut)}
- {rates &&
{formatBytesRate(rates.bytesOutRate)}
} -
- - {rates?.latencyMeanMs != null - ? formatLatency(rates.latencyMeanMs) - : "—"} - - - {formatUptime(ps.uptimeSeconds)} - -
- ); - })} -
-
- ) : ( -
-

- No pipeline metrics yet -

-

- Metrics appear after pipelines are deployed and the agent reports heartbeats. -

-
- )} -
-
- - {/* Logs */} -
-

- - Logs -

- ({ - id: ps.pipelineId, - name: ps.pipeline?.name ?? ps.pipelineId.slice(0, 8), - })) ?? [] - } - /> -
+ + + + {/* Pipeline Metrics */} + + + + + Pipeline Metrics + + + + {node.pipelineStatuses && node.pipelineStatuses.length > 0 ? ( + + + + Pipeline + Status + Events In + Events Out + Errors + Bytes In + Bytes Out + Avg Latency + Uptime + + + + {node.pipelineStatuses.map((ps) => { + const rates = pipelineRates[ps.pipelineId]; + return ( + + + {ps.pipeline?.name ?? ps.pipelineId.slice(0, 8)} + + + + {pipelineStatusLabel(ps.status)} + + + +
{formatCount(ps.eventsIn)}
+ {rates &&
{formatRate(rates.eventsInRate)}
} +
+ +
{formatCount(ps.eventsOut)}
+ {rates &&
{formatRate(rates.eventsOutRate)}
} +
+ +
{formatCount(ps.errorsTotal)}
+ {rates && rates.errorsRate > 0 &&
{formatRate(rates.errorsRate)}
} +
+ +
{formatBytes(ps.bytesIn)}
+ {rates &&
{formatBytesRate(rates.bytesInRate)}
} +
+ +
{formatBytes(ps.bytesOut)}
+ {rates &&
{formatBytesRate(rates.bytesOutRate)}
} +
+ + {rates?.latencyMeanMs != null + ? formatLatency(rates.latencyMeanMs) + : "—"} + + + {formatUptime(ps.uptimeSeconds)} + +
+ ); + })} +
+
+ ) : ( +
+

+ No pipeline metrics yet +

+

+ Metrics appear after pipelines are deployed and the agent reports heartbeats. +

+
+ )} +
+
+
+ + + +
+ + + + + Event Log + + + + + +
+
+ + + + + + + ({ + id: ps.pipelineId, + name: ps.pipeline?.name ?? ps.pipelineId.slice(0, 8), + })) ?? [] + } + /> + +
); } diff --git a/src/app/api/agent/enroll/route.ts b/src/app/api/agent/enroll/route.ts index 3082f184..60ab30be 100644 --- a/src/app/api/agent/enroll/route.ts +++ b/src/app/api/agent/enroll/route.ts @@ -83,6 +83,15 @@ export async function POST(request: Request) { }); debugLog("enroll", `SUCCESS -- node ${node.id} enrolled in "${matchedEnv.name}"`); + await prisma.nodeStatusEvent.create({ + data: { + nodeId: node.id, + fromStatus: null, + toStatus: "HEALTHY", + reason: "enrolled", + }, + }); + void fireEventAlert("node_joined", matchedEnv.id, { message: `Node "${hostname}" enrolled in environment "${matchedEnv.name}"`, nodeId: node.id, diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 0391adab..7508e5d9 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -171,6 +171,12 @@ export async function POST(request: Request) { } } + // Fetch current status before update so we can record a transition event + const prevNode = await prisma.vectorNode.findUnique({ + where: { id: agent.nodeId }, + select: { status: true }, + }); + // Update node heartbeat and metadata const node = await prisma.vectorNode.update({ where: { id: agent.nodeId }, @@ -189,6 +195,18 @@ export async function POST(request: Request) { }, }); + // Record a status transition event when the node recovers from a non-HEALTHY state + if (prevNode && prevNode.status !== "HEALTHY") { + await prisma.nodeStatusEvent.create({ + data: { + nodeId: agent.nodeId, + fromStatus: prevNode.status, + toStatus: "HEALTHY", + reason: "heartbeat received", + }, + }); + } + // Merge agent-reported labels with existing UI-set labels. // UI-set labels take precedence over agent-reported labels. // Uses a single atomic operation to avoid TOCTOU race with fleet.updateLabels: diff --git a/src/components/fleet/event-log.tsx b/src/components/fleet/event-log.tsx new file mode 100644 index 00000000..b225e065 --- /dev/null +++ b/src/components/fleet/event-log.tsx @@ -0,0 +1,87 @@ +"use client"; + +import { useQuery } from "@tanstack/react-query"; +import { useTRPC } from "@/trpc/client"; +import { Skeleton } from "@/components/ui/skeleton"; + +type Range = "1h" | "6h" | "1d" | "7d" | "30d"; + +interface EventLogProps { + nodeId: string; + range: Range; +} + +const STATUS_COLORS: Record = { + HEALTHY: "#22c55e", + UNREACHABLE: "#ef4444", + DEGRADED: "#f59e0b", + UNKNOWN: "#6b7280", +}; + +function statusColor(status: string | null | undefined): string { + return STATUS_COLORS[status ?? "UNKNOWN"] ?? "#6b7280"; +} + +function formatTime(date: Date | string): string { + return new Date(date).toLocaleTimeString([], { + hour: "2-digit", + minute: "2-digit", + }); +} + +export function EventLog({ nodeId, range }: EventLogProps) { + const trpc = useTRPC(); + + const { data: events, isLoading } = useQuery({ + ...trpc.fleet.getStatusTimeline.queryOptions({ nodeId, range }), + refetchInterval: 15_000, + }); + + if (isLoading) { + return ( +
+ + + +
+ ); + } + + const reversed = events ? [...events].reverse() : []; + + if (reversed.length === 0) { + return ( +

+ No events in this time range. +

+ ); + } + + return ( +
+ {reversed.map((event) => ( +
+ {/* Colored dot for toStatus */} + + {/* Timestamp */} + + {formatTime(event.timestamp)} + + {/* Transition */} + + {event.fromStatus === null + ? "enrolled" + : `${event.fromStatus} → ${event.toStatus}`} + + {/* Reason */} + {event.reason && ( + {event.reason} + )} +
+ ))} +
+ ); +} diff --git a/src/components/fleet/status-timeline.tsx b/src/components/fleet/status-timeline.tsx new file mode 100644 index 00000000..565b823b --- /dev/null +++ b/src/components/fleet/status-timeline.tsx @@ -0,0 +1,152 @@ +"use client"; + +import { useMemo } from "react"; +import { useQuery } from "@tanstack/react-query"; +import { useTRPC } from "@/trpc/client"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Skeleton } from "@/components/ui/skeleton"; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@/components/ui/tooltip"; + +type Range = "1h" | "6h" | "1d" | "7d" | "30d"; + +interface StatusTimelineProps { + nodeId: string; + range: Range; + onRangeChange: (range: Range) => void; +} + +const STATUS_COLORS: Record = { + HEALTHY: "#22c55e", + UNREACHABLE: "#ef4444", + DEGRADED: "#f59e0b", + UNKNOWN: "#6b7280", +}; + +function statusColor(status: string | null | undefined): string { + return STATUS_COLORS[status ?? "UNKNOWN"] ?? "#6b7280"; +} + +function formatTime(date: Date | string): string { + return new Date(date).toLocaleTimeString([], { + hour: "2-digit", + minute: "2-digit", + }); +} + +function formatDuration(ms: number): string { + const minutes = Math.round(ms / 60_000); + if (minutes < 60) return `${minutes}min`; + const hours = Math.floor(minutes / 60); + const rem = minutes % 60; + return rem > 0 ? `${hours}h ${rem}min` : `${hours}h`; +} + +export function StatusTimeline({ nodeId, range, onRangeChange }: StatusTimelineProps) { + const trpc = useTRPC(); + + const { data: events, isLoading, dataUpdatedAt } = useQuery({ + ...trpc.fleet.getStatusTimeline.queryOptions({ nodeId, range }), + refetchInterval: 15_000, + }); + + type Segment = { + status: string; + start: number; + end: number; + }; + + const { segments, totalMs } = useMemo(() => { + const rangeMs: Record = { + "1h": 60 * 60 * 1000, + "6h": 6 * 60 * 60 * 1000, + "1d": 24 * 60 * 60 * 1000, + "7d": 7 * 24 * 60 * 60 * 1000, + "30d": 30 * 24 * 60 * 60 * 1000, + }; + // Use dataUpdatedAt as "now" — it's a stable value from React Query + // that updates each time data is fetched, keeping segments aligned with data + const now = dataUpdatedAt || 0; + const rangeStart = now - rangeMs[range]; + const segs: Segment[] = []; + + if (events !== undefined && now > 0) { + if (events.length === 0) { + segs.push({ status: "UNKNOWN", start: rangeStart, end: now }); + } else { + // First segment: from range start to first event + const firstStatus = events[0].fromStatus ?? "UNKNOWN"; + segs.push({ status: firstStatus, start: rangeStart, end: new Date(events[0].timestamp).getTime() }); + + // Middle segments + for (let i = 0; i < events.length; i++) { + const segStart = new Date(events[i].timestamp).getTime(); + const segEnd = i + 1 < events.length ? new Date(events[i + 1].timestamp).getTime() : now; + segs.push({ status: events[i].toStatus, start: segStart, end: segEnd }); + } + } + } + + return { segments: segs, totalMs: now - rangeStart }; + }, [events, range, dataUpdatedAt]); + + return ( +
+
+ Status Timeline + +
+ + {isLoading ? ( + + ) : ( + +
+ {segments.map((seg, i) => { + const duration = seg.end - seg.start; + const flexGrow = duration / totalMs; + const label = `${seg.status} for ${formatDuration(duration)} (${formatTime(new Date(seg.start))} – ${formatTime(new Date(seg.end))})`; + return ( + + +
+ + + {label} + + + ); + })} +
+ + )} +
+ ); +} diff --git a/src/components/fleet/uptime-cards.tsx b/src/components/fleet/uptime-cards.tsx new file mode 100644 index 00000000..e84c893d --- /dev/null +++ b/src/components/fleet/uptime-cards.tsx @@ -0,0 +1,72 @@ +"use client"; + +import { useQuery } from "@tanstack/react-query"; +import { useTRPC } from "@/trpc/client"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Skeleton } from "@/components/ui/skeleton"; + +interface UptimeCardsProps { + nodeId: string; +} + +function uptimeColor(percent: number): string { + if (percent >= 99.5) return "text-green-600"; + if (percent >= 99.0) return "text-amber-600"; + return "text-red-600"; +} + +function UptimeCard({ + title, + nodeId, + range, +}: { + title: string; + nodeId: string; + range: "1d" | "7d" | "30d"; +}) { + const trpc = useTRPC(); + + const { data, isLoading } = useQuery({ + ...trpc.fleet.getUptime.queryOptions({ nodeId, range }), + refetchInterval: 15_000, + }); + + return ( + + + {title} + + + {isLoading ? ( +
+ + +
+ ) : data == null ? ( +
+

+
+ ) : ( +
+

+ {data.uptimePercent.toFixed(2)}% +

+

+ {data.incidents} incident{data.incidents !== 1 ? "s" : ""} +

+
+ )} +
+
+ ); +} + +export function UptimeCards({ nodeId }: UptimeCardsProps) { + return ( +
+ + + +
+ ); +} diff --git a/src/server/routers/fleet.ts b/src/server/routers/fleet.ts index 2a82f09b..57d711e1 100644 --- a/src/server/routers/fleet.ts +++ b/src/server/routers/fleet.ts @@ -46,7 +46,96 @@ export const fleetRouter = router({ message: "Node not found", }); } - return node; + const latestEvent = await prisma.nodeStatusEvent.findFirst({ + where: { nodeId: input.id, toStatus: node.status }, + orderBy: { timestamp: "desc" }, + select: { timestamp: true }, + }); + return { + ...node, + currentStatusSince: latestEvent?.timestamp ?? null, + }; + }), + + getStatusTimeline: protectedProcedure + .input(z.object({ + nodeId: z.string(), + range: z.enum(["1h", "6h", "1d", "7d", "30d"]), + })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + const rangeMs: Record = { + "1h": 60 * 60 * 1000, + "6h": 6 * 60 * 60 * 1000, + "1d": 24 * 60 * 60 * 1000, + "7d": 7 * 24 * 60 * 60 * 1000, + "30d": 30 * 24 * 60 * 60 * 1000, + }; + const since = new Date(Date.now() - rangeMs[input.range]); + return prisma.nodeStatusEvent.findMany({ + where: { nodeId: input.nodeId, timestamp: { gte: since } }, + orderBy: { timestamp: "asc" }, + }); + }), + + getUptime: protectedProcedure + .input(z.object({ + nodeId: z.string(), + range: z.enum(["1d", "7d", "30d"]), + })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + const rangeMs: Record = { + "1d": 24 * 60 * 60 * 1000, + "7d": 7 * 24 * 60 * 60 * 1000, + "30d": 30 * 24 * 60 * 60 * 1000, + }; + const now = Date.now(); + const since = new Date(now - rangeMs[input.range]); + const totalSeconds = rangeMs[input.range] / 1000; + + // Get events in range + const events = await prisma.nodeStatusEvent.findMany({ + where: { nodeId: input.nodeId, timestamp: { gte: since } }, + orderBy: { timestamp: "asc" }, + }); + + // Get the last event before the range to know starting status + const priorEvent = await prisma.nodeStatusEvent.findFirst({ + where: { nodeId: input.nodeId, timestamp: { lt: since } }, + orderBy: { timestamp: "desc" }, + }); + + // Walk events, tracking time in HEALTHY status + let healthySeconds = 0; + let incidents = 0; + let currentStatus = priorEvent?.toStatus ?? "UNKNOWN"; + let cursor = since.getTime(); + + for (const event of events) { + const eventTime = event.timestamp.getTime(); + const elapsed = (eventTime - cursor) / 1000; + if (currentStatus === "HEALTHY") { + healthySeconds += elapsed; + } + if (event.toStatus === "UNREACHABLE" || event.toStatus === "DEGRADED") { + incidents++; + } + currentStatus = event.toStatus; + cursor = eventTime; + } + + // Account for time from last event to now + const remaining = (now - cursor) / 1000; + if (currentStatus === "HEALTHY") { + healthySeconds += remaining; + } + + const uptimePercent = totalSeconds > 0 + ? Math.round((healthySeconds / totalSeconds) * 10000) / 100 + : 0; + + return { uptimePercent, totalSeconds, healthySeconds: Math.round(healthySeconds), incidents }; }), create: protectedProcedure diff --git a/src/server/services/fleet-health.ts b/src/server/services/fleet-health.ts index 81ee0243..d81cf9da 100644 --- a/src/server/services/fleet-health.ts +++ b/src/server/services/fleet-health.ts @@ -21,17 +21,38 @@ export async function checkNodeHealth(): Promise { lastHeartbeat: { lt: maxAge }, status: { not: "UNREACHABLE" }, }, - select: { id: true, name: true, environmentId: true }, + select: { id: true, name: true, environmentId: true, status: true }, }); - await prisma.vectorNode.updateMany({ - where: { - nodeTokenHash: { not: null }, - lastHeartbeat: { lt: maxAge }, - status: { not: "UNREACHABLE" }, - }, - data: { status: "UNREACHABLE" }, - }); + if (goingUnreachable.length > 0) { + await prisma.$transaction(async (tx) => { + await tx.nodeStatusEvent.createMany({ + data: goingUnreachable.map((node) => ({ + nodeId: node.id, + fromStatus: node.status, + toStatus: "UNREACHABLE", + reason: "heartbeat timeout", + })), + }); + await tx.vectorNode.updateMany({ + where: { + nodeTokenHash: { not: null }, + lastHeartbeat: { lt: maxAge }, + status: { not: "UNREACHABLE" }, + }, + data: { status: "UNREACHABLE" }, + }); + }); + } else { + await prisma.vectorNode.updateMany({ + where: { + nodeTokenHash: { not: null }, + lastHeartbeat: { lt: maxAge }, + status: { not: "UNREACHABLE" }, + }, + data: { status: "UNREACHABLE" }, + }); + } for (const node of goingUnreachable) { void fireEventAlert("node_left", node.environmentId, { diff --git a/src/server/services/metrics-cleanup.ts b/src/server/services/metrics-cleanup.ts index 490d4429..c765f539 100644 --- a/src/server/services/metrics-cleanup.ts +++ b/src/server/services/metrics-cleanup.ts @@ -16,7 +16,7 @@ export async function cleanupOldMetrics(): Promise { const metricsCutoff = new Date(Date.now() - metricsRetentionDays * 24 * 60 * 60 * 1000); const logsCutoff = new Date(Date.now() - logsRetentionDays * 24 * 60 * 60 * 1000); - const [pipelineResult, nodeResult, logsResult] = await Promise.all([ + const [pipelineResult, nodeResult, logsResult, statusEventsResult] = await Promise.all([ prisma.pipelineMetric.deleteMany({ where: { timestamp: { lt: metricsCutoff } }, }), @@ -26,7 +26,10 @@ export async function cleanupOldMetrics(): Promise { prisma.pipelineLog.deleteMany({ where: { timestamp: { lt: logsCutoff } }, }), + prisma.nodeStatusEvent.deleteMany({ + where: { timestamp: { lt: metricsCutoff } }, + }), ]); - return pipelineResult.count + nodeResult.count + logsResult.count; + return pipelineResult.count + nodeResult.count + logsResult.count + statusEventsResult.count; }