From 370f678a75576696d510a02d90b320685a854e9c Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:33:04 +0000 Subject: [PATCH 01/23] chore: add ws dependency for WebSocket push channel --- package.json | 2 ++ pnpm-lock.yaml | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/package.json b/package.json index 0fe1381..081190f 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,7 @@ "sonner": "^2.0.7", "superjson": "^2.2.6", "tailwind-merge": "^3.5.0", + "ws": "^8.19.0", "zod": "^4.3.6", "zustand": "^5.0.11" }, @@ -74,6 +75,7 @@ "@types/nodemailer": "^7.0.11", "@types/react": "^19", "@types/react-dom": "^19", + "@types/ws": "^8.18.1", "eslint": "^9", "eslint-config-next": "16.1.6", "prisma": "^7.4.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 11ee796..ee8353f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -129,6 +129,9 @@ importers: tailwind-merge: specifier: ^3.5.0 version: 3.5.0 + ws: + specifier: ^8.19.0 + version: 8.19.0 zod: specifier: ^4.3.6 version: 4.3.6 @@ -163,6 +166,9 @@ importers: '@types/react-dom': specifier: ^19 version: 19.2.3(@types/react@19.2.14) + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 eslint: specifier: ^9 version: 9.39.3(jiti@2.6.1) @@ -1950,6 +1956,9 @@ packages: '@types/validate-npm-package-name@4.0.2': resolution: {integrity: sha512-lrpDziQipxCEeK5kWxvljWYhUvOiB2A9izZd9B2AFarYAkqZshb4lPbRs7zKEic6eGtH8V/2qJW+dPp9OtF6bw==} + '@types/ws@8.18.1': + resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} + '@typescript-eslint/eslint-plugin@8.56.1': resolution: {integrity: sha512-Jz9ZztpB37dNC+HU2HI28Bs9QXpzCz+y/twHOwhyrIRdbuVDxSytJNDl6z/aAKlaRIwC7y8wJdkBv7FxYGgi0A==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} @@ -4752,6 +4761,18 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + ws@8.19.0: + resolution: {integrity: sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + wsl-utils@0.3.1: resolution: {integrity: sha512-g/eziiSUNBSsdDJtCLB8bdYEUMj4jR7AGeUo96p/3dTafgjHhpF4RiCFPiRILwjQoDXx5MqkBr4fwWtR3Ky4Wg==} engines: {node: '>=20'} @@ -6608,6 +6629,10 @@ snapshots: '@types/validate-npm-package-name@4.0.2': {} + '@types/ws@8.18.1': + dependencies: + '@types/node': 20.19.35 + '@typescript-eslint/eslint-plugin@8.56.1(@typescript-eslint/parser@8.56.1(eslint@9.39.3(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.3(jiti@2.6.1))(typescript@5.9.3)': dependencies: '@eslint-community/regexpp': 4.12.2 @@ -9748,6 +9773,8 @@ snapshots: wrappy@1.0.2: {} + ws@8.19.0: {} + wsl-utils@0.3.1: dependencies: is-wsl: 3.1.1 From 110f81d2b6de57bbd067b88ec706839e9c11e735 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:33:12 +0000 Subject: [PATCH 02/23] feat: define push message types for WebSocket protocol --- src/server/services/ws-types.ts | 45 +++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 src/server/services/ws-types.ts diff --git a/src/server/services/ws-types.ts b/src/server/services/ws-types.ts new file mode 100644 index 0000000..3f5dac1 --- /dev/null +++ b/src/server/services/ws-types.ts @@ -0,0 +1,45 @@ +// src/server/services/ws-types.ts + +/** + * Server→Agent push message types sent over WebSocket. + * Each message has a `type` discriminator for client-side dispatch. + * + * Config changes use lightweight notifications (config_changed) that trigger + * an immediate re-poll, rather than carrying the full assembled config. + * This avoids duplicating secret/cert resolution logic from the config endpoint. + */ +export type PushMessage = + | ConfigChangedMessage + | SampleRequestMessage + | ActionMessage + | PollIntervalMessage; + +/** Notification that pipeline config has changed. Agent should re-poll immediately. */ +export interface ConfigChangedMessage { + type: "config_changed"; + /** Optional: which pipeline changed. If absent, agent re-polls all. */ + pipelineId?: string; + /** Reason for the change (deploy, undeploy, maintenance). For logging only. */ + reason?: string; +} + +export interface SampleRequestMessage { + type: "sample_request"; + requestId: string; + pipelineId: string; + componentKeys: string[]; + limit: number; +} + +export interface ActionMessage { + type: "action"; + action: "self_update" | "restart"; + targetVersion?: string; + downloadUrl?: string; + checksum?: string; +} + +export interface PollIntervalMessage { + type: "poll_interval"; + intervalMs: number; +} From 9197cbd9e849b17d838f801e41fb80fe7164039e Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:34:03 +0000 Subject: [PATCH 03/23] feat: add in-memory WebSocket connection registry --- src/server/services/ws-registry.ts | 56 ++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/server/services/ws-registry.ts diff --git a/src/server/services/ws-registry.ts b/src/server/services/ws-registry.ts new file mode 100644 index 0000000..b20fd69 --- /dev/null +++ b/src/server/services/ws-registry.ts @@ -0,0 +1,56 @@ +import type WebSocket from "ws"; +import type { PushMessage } from "./ws-types"; + +class WsRegistry { + private connections = new Map(); + + register(nodeId: string, ws: WebSocket): void { + const existing = this.connections.get(nodeId); + if (existing && existing.readyState === existing.OPEN) { + existing.close(1000, "replaced"); + } + this.connections.set(nodeId, ws); + } + + unregister(nodeId: string): void { + this.connections.delete(nodeId); + } + + send(nodeId: string, message: PushMessage): boolean { + const ws = this.connections.get(nodeId); + if (!ws || ws.readyState !== ws.OPEN) { + return false; + } + try { + ws.send(JSON.stringify(message)); + return true; + } catch { + return false; + } + } + + broadcast(nodeIds: string[], message: PushMessage): string[] { + const sent: string[] = []; + for (const nodeId of nodeIds) { + if (this.send(nodeId, message)) { + sent.push(nodeId); + } + } + return sent; + } + + isConnected(nodeId: string): boolean { + const ws = this.connections.get(nodeId); + return ws !== undefined && ws.readyState === ws.OPEN; + } + + get size(): number { + let count = 0; + for (const [, ws] of this.connections) { + if (ws.readyState === ws.OPEN) count++; + } + return count; + } +} + +export const wsRegistry = new WsRegistry(); From ed4036084fa1b0e3a477af77e3836ada7614f684 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:34:08 +0000 Subject: [PATCH 04/23] feat: add WebSocket upgrade authentication helper --- src/server/services/ws-auth.ts | 36 ++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 src/server/services/ws-auth.ts diff --git a/src/server/services/ws-auth.ts b/src/server/services/ws-auth.ts new file mode 100644 index 0000000..d9cfe1a --- /dev/null +++ b/src/server/services/ws-auth.ts @@ -0,0 +1,36 @@ +import type { IncomingMessage } from "http"; +import { prisma } from "@/lib/prisma"; +import { extractBearerToken, verifyNodeToken } from "./agent-token"; + +/** + * Authenticate a WebSocket upgrade request by verifying its Bearer token + * against all node tokens. + * + * Returns the matching node and environment IDs, or null if authentication fails. + */ +export async function authenticateWsUpgrade( + req: IncomingMessage, +): Promise<{ nodeId: string; environmentId: string } | null> { + const authHeader = req.headers["authorization"]; + const token = extractBearerToken( + Array.isArray(authHeader) ? authHeader[0] : authHeader ?? null, + ); + if (!token) { + return null; + } + + const nodes = await prisma.vectorNode.findMany({ + where: { nodeTokenHash: { not: null } }, + select: { id: true, environmentId: true, nodeTokenHash: true }, + }); + + for (const node of nodes) { + if (!node.nodeTokenHash) continue; + const valid = await verifyNodeToken(token, node.nodeTokenHash); + if (valid) { + return { nodeId: node.id, environmentId: node.environmentId }; + } + } + + return null; +} From b559f35b7530086de1909f899f42d65b2c784b94 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:35:21 +0000 Subject: [PATCH 05/23] feat: custom server.ts with WebSocket upgrade handler and keepalive --- package.json | 2 +- server.ts | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 server.ts diff --git a/package.json b/package.json index 081190f..38ba449 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "private": true, "packageManager": "pnpm@10.13.1", "scripts": { - "dev": "next dev", + "dev": "tsx server.ts", "build": "next build", "start": "next start", "lint": "eslint", diff --git a/server.ts b/server.ts new file mode 100644 index 0000000..86e73c1 --- /dev/null +++ b/server.ts @@ -0,0 +1,99 @@ +import { createServer, type IncomingMessage } from "http"; +import type { Socket } from "net"; +import next from "next"; +import { parse } from "url"; +import { WebSocketServer } from "ws"; + +const dev = process.env.NODE_ENV !== "production"; +const hostname = process.env.HOSTNAME || "0.0.0.0"; +const port = parseInt(process.env.PORT || "3000", 10); + +const PING_INTERVAL_MS = 30_000; +const PONG_TIMEOUT_MS = 10_000; +const WS_PATH = "/api/agent/ws"; + +const app = next({ dev, hostname, port }); +const handle = app.getRequestHandler(); + +app.prepare().then(async () => { + const { authenticateWsUpgrade } = await import("./src/server/services/ws-auth"); + const { wsRegistry } = await import("./src/server/services/ws-registry"); + + const server = createServer((req, res) => { + const parsedUrl = parse(req.url ?? "/", true); + handle(req, res, parsedUrl); + }); + + const wss = new WebSocketServer({ noServer: true }); + + server.on("upgrade", async (req: IncomingMessage, socket: Socket, head: Buffer) => { + const { pathname } = parse(req.url ?? "/", true); + + if (pathname !== WS_PATH) { + socket.destroy(); + return; + } + + const agent = await authenticateWsUpgrade(req); + if (!agent) { + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); + socket.destroy(); + return; + } + + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit("connection", ws, req, agent); + }); + }); + + wss.on("connection", (ws, _req, agent: { nodeId: string; environmentId: string }) => { + const { nodeId } = agent; + console.log(`[ws] agent connected: ${nodeId}`); + wsRegistry.register(nodeId, ws); + + let alive = true; + let pongTimer: ReturnType | null = null; + + ws.on("pong", () => { + alive = true; + if (pongTimer) { + clearTimeout(pongTimer); + pongTimer = null; + } + }); + + const pingInterval = setInterval(() => { + if (!alive) { + console.log(`[ws] agent ${nodeId} did not respond to ping, terminating`); + ws.terminate(); + return; + } + alive = false; + ws.ping(); + pongTimer = setTimeout(() => { + if (!alive) { + console.log(`[ws] agent ${nodeId} pong timeout (${PONG_TIMEOUT_MS}ms), terminating`); + ws.terminate(); + } + }, PONG_TIMEOUT_MS); + }, PING_INTERVAL_MS); + + ws.on("close", () => { + console.log(`[ws] agent disconnected: ${nodeId}`); + clearInterval(pingInterval); + if (pongTimer) clearTimeout(pongTimer); + wsRegistry.unregister(nodeId); + }); + + ws.on("error", (err) => { + console.error(`[ws] error for agent ${nodeId}:`, err.message); + clearInterval(pingInterval); + if (pongTimer) clearTimeout(pongTimer); + wsRegistry.unregister(nodeId); + }); + }); + + server.listen(port, hostname, () => { + console.log(`> Ready on http://${hostname}:${port}`); + }); +}); From 9cbf3027cd95be41a8ea2ada4eedb0daea701566 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:36:49 +0000 Subject: [PATCH 06/23] chore: add gorilla/websocket dependency for agent WebSocket client --- agent/go.mod | 2 ++ agent/go.sum | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 agent/go.sum diff --git a/agent/go.mod b/agent/go.mod index a75086f..174bc3b 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -1,3 +1,5 @@ module github.com/TerrifiedBug/vectorflow/agent go 1.22 + +require github.com/gorilla/websocket v1.5.3 // indirect diff --git a/agent/go.sum b/agent/go.sum new file mode 100644 index 0000000..25a9fc4 --- /dev/null +++ b/agent/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= From 474646e1b41583a0fb70766eb1c158006e49fc5d Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:36:55 +0000 Subject: [PATCH 07/23] feat: define Go types for WebSocket push messages --- agent/internal/ws/types.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 agent/internal/ws/types.go diff --git a/agent/internal/ws/types.go b/agent/internal/ws/types.go new file mode 100644 index 0000000..a7a0feb --- /dev/null +++ b/agent/internal/ws/types.go @@ -0,0 +1,29 @@ +package ws + +// PushMessage is the envelope for all server→agent push messages. +// The Type field determines which concrete fields are populated. +// +// Fields are shared across message types: +// - PipelineID: used by config_changed, sample_request +// - Checksum: used by action (self_update) +type PushMessage struct { + Type string `json:"type"` + + // config_changed fields + PipelineID string `json:"pipelineId,omitempty"` + Reason string `json:"reason,omitempty"` + + // sample_request fields + RequestID string `json:"requestId,omitempty"` + ComponentKeys []string `json:"componentKeys,omitempty"` + Limit int `json:"limit,omitempty"` + + // action fields + Action string `json:"action,omitempty"` + TargetVersion string `json:"targetVersion,omitempty"` + DownloadURL string `json:"downloadUrl,omitempty"` + Checksum string `json:"checksum,omitempty"` + + // poll_interval fields + IntervalMs int `json:"intervalMs,omitempty"` +} From 1a9816652d787e917dd40b5ce1ff08030136f700 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:36:59 +0000 Subject: [PATCH 08/23] feat: dedicated /api/agent/samples endpoint for sample results --- src/app/api/agent/samples/route.ts | 104 +++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 src/app/api/agent/samples/route.ts diff --git a/src/app/api/agent/samples/route.ts b/src/app/api/agent/samples/route.ts new file mode 100644 index 0000000..b0a0e16 --- /dev/null +++ b/src/app/api/agent/samples/route.ts @@ -0,0 +1,104 @@ +import { NextResponse } from "next/server"; +import { prisma } from "@/lib/prisma"; +import { authenticateAgent } from "@/server/services/agent-auth"; +import { z } from "zod"; + +const sampleResultSchema = z.object({ + results: z.array( + z.object({ + requestId: z.string(), + componentKey: z.string(), + events: z.array(z.unknown()).optional().default([]), + schema: z + .array( + z.object({ + path: z.string(), + type: z.string(), + sample: z.string(), + }), + ) + .optional() + .default([]), + error: z.string().optional(), + }), + ), +}); + +export async function POST(request: Request) { + const agent = await authenticateAgent(request); + if (!agent) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + try { + const body = await request.json(); + const parsed = sampleResultSchema.safeParse(body); + if (!parsed.success) { + return NextResponse.json( + { error: "Invalid payload", details: parsed.error.issues }, + { status: 400 }, + ); + } + + const { results } = parsed.data; + + for (const result of results) { + const sampleRequest = await prisma.eventSampleRequest.findUnique({ + where: { id: result.requestId }, + }); + if (!sampleRequest || sampleRequest.status !== "PENDING") { + continue; + } + + if (result.error) { + await prisma.eventSampleRequest.update({ + where: { id: result.requestId }, + data: { status: "ERROR", completedAt: new Date(), nodeId: agent.nodeId }, + }); + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: sampleRequest.pipelineId, + componentKey: result.componentKey, + events: [], + schema: [], + error: result.error, + }, + }); + } else { + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: sampleRequest.pipelineId, + componentKey: result.componentKey, + events: result.events as object[], + schema: result.schema, + }, + }); + + const componentKeys = sampleRequest.componentKeys as string[]; + const completedKeys = await prisma.eventSample.findMany({ + where: { requestId: result.requestId }, + select: { componentKey: true }, + }); + const completedKeySet = new Set(completedKeys.map((s) => s.componentKey)); + const allDone = componentKeys.every((k) => completedKeySet.has(k)); + + if (allDone) { + await prisma.eventSampleRequest.update({ + where: { id: result.requestId }, + data: { status: "COMPLETED", completedAt: new Date(), nodeId: agent.nodeId }, + }); + } + } + } + + return NextResponse.json({ ok: true }); + } catch (error) { + console.error("Sample results error:", error); + return NextResponse.json( + { error: "Failed to process sample results" }, + { status: 500 }, + ); + } +} From 78cd4389340a812de6ffcec53e40de4c9169ddeb Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:37:03 +0000 Subject: [PATCH 09/23] feat: include websocketUrl in agent config response --- src/app/api/agent/config/route.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/app/api/agent/config/route.ts b/src/app/api/agent/config/route.ts index 822f906..f6b6fa8 100644 --- a/src/app/api/agent/config/route.ts +++ b/src/app/api/agent/config/route.ts @@ -196,9 +196,18 @@ export async function GET(request: Request) { select: { fleetPollIntervalMs: true }, }); + // Build WebSocket URL from the incoming request's host + const proto = request.headers.get("x-forwarded-proto") ?? "http"; + const wsProto = proto === "https" ? "wss" : "ws"; + const host = request.headers.get("x-forwarded-host") + ?? request.headers.get("host") + ?? `localhost:${process.env.PORT ?? 3000}`; + const websocketUrl = `${wsProto}://${host}/api/agent/ws`; + return NextResponse.json({ pipelines: pipelineConfigs, pollIntervalMs: settings?.fleetPollIntervalMs ?? 15000, + websocketUrl, secretBackend: environment.secretBackend, ...(environment.secretBackend !== "BUILTIN" ? { secretBackendConfig: environment.secretBackendConfig } From 6cb38b328116979eda9827958678a972e2595cfb Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:40:48 +0000 Subject: [PATCH 10/23] feat: compile custom server.ts in Docker build for WebSocket support --- docker/server/Dockerfile | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index 84960ce..0479327 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -31,8 +31,15 @@ COPY prisma.config.ts ./prisma.config.ts RUN npx prisma generate COPY src ./src COPY public ./public +COPY server.ts ./server.ts RUN --mount=type=cache,target=/app/.next/cache \ pnpm build +# Bundle custom server.ts → server.js (replaces Next.js default standalone server.js) +# Uses --alias to resolve @/ imports, --packages=external keeps npm deps external +RUN npx esbuild server.ts --bundle --platform=node --target=node22 \ + --outfile=.next/standalone/server.js \ + --alias:@/=./src/ \ + --packages=external # Save Prisma client version for the runner stage (standalone strips node_modules) RUN node -e "console.log(require('./node_modules/@prisma/client/package.json').version)" > /tmp/prisma-version From 1485f515e4f218e5e16effd1a6d320d87aac1423 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:41:48 +0000 Subject: [PATCH 11/23] test: WebSocket client message parsing and reconnect tests --- agent/internal/ws/client_test.go | 211 +++++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 agent/internal/ws/client_test.go diff --git a/agent/internal/ws/client_test.go b/agent/internal/ws/client_test.go new file mode 100644 index 0000000..e9afe01 --- /dev/null +++ b/agent/internal/ws/client_test.go @@ -0,0 +1,211 @@ +// agent/internal/ws/client_test.go +package ws + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func TestPushMessageParsing(t *testing.T) { + tests := []struct { + name string + json string + wantType string + }{ + { + name: "config_changed", + json: `{"type":"config_changed","pipelineId":"p1","reason":"deploy"}`, + wantType: "config_changed", + }, + { + name: "config_changed_no_pipeline", + json: `{"type":"config_changed","reason":"maintenance_on"}`, + wantType: "config_changed", + }, + { + name: "sample_request", + json: `{"type":"sample_request","requestId":"r1","pipelineId":"p1","componentKeys":["source_in"],"limit":5}`, + wantType: "sample_request", + }, + { + name: "action_self_update", + json: `{"type":"action","action":"self_update","targetVersion":"v1.2.3","downloadUrl":"https://example.com/agent","checksum":"sha256:abc"}`, + wantType: "action", + }, + { + name: "poll_interval", + json: `{"type":"poll_interval","intervalMs":5000}`, + wantType: "poll_interval", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var msg PushMessage + if err := json.Unmarshal([]byte(tt.json), &msg); err != nil { + t.Fatalf("unmarshal error: %v", err) + } + if msg.Type != tt.wantType { + t.Errorf("got type %q, want %q", msg.Type, tt.wantType) + } + }) + } +} + +func TestClientConnectsAndReceivesMessages(t *testing.T) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + + var serverConn *websocket.Conn + var mu sync.Mutex + connected := make(chan struct{}, 1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify auth header + auth := r.Header.Get("Authorization") + if auth != "Bearer test-token" { + http.Error(w, "unauthorized", 401) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Logf("upgrade error: %v", err) + return + } + mu.Lock() + serverConn = conn + mu.Unlock() + select { + case connected <- struct{}{}: + default: + } + })) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + + var received []PushMessage + var recvMu sync.Mutex + msgCh := make(chan struct{}, 1) + + client := New(wsURL, "test-token", func(msg PushMessage) { + recvMu.Lock() + received = append(received, msg) + recvMu.Unlock() + select { + case msgCh <- struct{}{}: + default: + } + }) + + go client.Connect() + defer client.Close() + + // Wait for connection (channel-based, not sleep) + select { + case <-connected: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for connection") + } + + // Send a message from server + mu.Lock() + conn := serverConn + mu.Unlock() + + if conn == nil { + t.Fatal("server did not receive connection") + } + + msg := PushMessage{Type: "config_changed", PipelineID: "p1", Reason: "deploy"} + data, _ := json.Marshal(msg) + if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { + t.Fatalf("write error: %v", err) + } + + // Wait for message to be received (channel-based) + select { + case <-msgCh: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for message") + } + + recvMu.Lock() + defer recvMu.Unlock() + if len(received) != 1 { + t.Fatalf("expected 1 message, got %d", len(received)) + } + if received[0].Type != "config_changed" { + t.Errorf("got type %q, want config_changed", received[0].Type) + } + if received[0].PipelineID != "p1" { + t.Errorf("got pipelineId %q, want p1", received[0].PipelineID) + } +} + +func TestClientReconnectsAfterDisconnect(t *testing.T) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + + var connections int + var connMu sync.Mutex + reconnected := make(chan struct{}, 1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + connMu.Lock() + connections++ + count := connections + connMu.Unlock() + + if count == 1 { + // Close first connection to trigger reconnect + time.Sleep(100 * time.Millisecond) + conn.Close() + } else { + // Signal reconnect happened + select { + case reconnected <- struct{}{}: + default: + } + // Keep second connection open + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + } + })) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + client := New(wsURL, "", func(msg PushMessage) {}) + go client.Connect() + defer client.Close() + + // Wait for reconnect (channel-based, not sleep) + select { + case <-reconnected: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for reconnect") + } + + connMu.Lock() + defer connMu.Unlock() + if connections < 2 { + t.Errorf("expected at least 2 connections (reconnect), got %d", connections) + } +} From 5cb4ef445dc29c034cf9940ba6d54c319d48eed7 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:42:15 +0000 Subject: [PATCH 12/23] feat: WebSocket client with exponential backoff reconnect --- agent/internal/ws/client.go | 143 ++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 agent/internal/ws/client.go diff --git a/agent/internal/ws/client.go b/agent/internal/ws/client.go new file mode 100644 index 0000000..817e490 --- /dev/null +++ b/agent/internal/ws/client.go @@ -0,0 +1,143 @@ +// agent/internal/ws/client.go +package ws + +import ( + "encoding/json" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// Client manages a persistent WebSocket connection to the VectorFlow server. +type Client struct { + url string + token string + onMessage func(PushMessage) + + mu sync.Mutex + conn *websocket.Conn + done chan struct{} +} + +// New creates a WebSocket client. Call Connect() to start. +func New(url, token string, onMessage func(PushMessage)) *Client { + return &Client{ + url: url, + token: token, + onMessage: onMessage, + done: make(chan struct{}), + } +} + +// Connect establishes the WebSocket connection with exponential backoff retry. +// Blocks until Close() is called. After connecting, starts a read loop that +// calls onMessage for each received message. +func (c *Client) Connect() { + backoff := time.Second + maxBackoff := 30 * time.Second + + for { + select { + case <-c.done: + return + default: + } + + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + header := http.Header{} + if c.token != "" { + header.Set("Authorization", "Bearer "+c.token) + } + + conn, _, err := dialer.Dial(c.url, header) + if err != nil { + slog.Warn("websocket connect failed, retrying", "url", c.url, "backoff", backoff, "error", err) + select { + case <-time.After(backoff): + case <-c.done: + return + } + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + continue + } + + // Connected — reset backoff + backoff = time.Second + slog.Info("websocket connected", "url", c.url) + + c.mu.Lock() + c.conn = conn + c.mu.Unlock() + + // Set read deadline so silent network drops are detected. + // Reset on every pong. Ping interval is 30s + 10s timeout = 40s. + conn.SetReadDeadline(time.Now().Add(45 * time.Second)) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(45 * time.Second)) + return nil + }) + + // Read loop — blocks until connection drops + c.readLoop(conn) + + c.mu.Lock() + c.conn = nil + c.mu.Unlock() + + slog.Warn("websocket disconnected, reconnecting", "backoff", backoff) + } +} + +func (c *Client) readLoop(conn *websocket.Conn) { + for { + _, data, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + slog.Warn("websocket read error", "error", err) + } + return + } + + // Reset read deadline on any received message + conn.SetReadDeadline(time.Now().Add(45 * time.Second)) + + var msg PushMessage + if err := json.Unmarshal(data, &msg); err != nil { + slog.Warn("websocket message parse error", "error", err, "data", string(data)) + continue + } + + slog.Debug("websocket message received", "type", msg.Type) + c.onMessage(msg) + } +} + +// Close gracefully shuts down the WebSocket connection. +func (c *Client) Close() { + select { + case <-c.done: + return // already closed + default: + close(c.done) + } + + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn != nil { + conn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + ) + conn.Close() + } +} From 245c2da8a34bcf26ffaded677db23b0f2b7e3944 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:42:28 +0000 Subject: [PATCH 13/23] feat: push WebSocket notifications from deploy, sample, fleet actions --- src/server/routers/deploy.ts | 20 +++++++++++++++++++- src/server/routers/fleet.ts | 24 ++++++++++++++++++++++-- src/server/routers/pipeline.ts | 16 ++++++++++++++++ src/server/services/deploy-agent.ts | 23 +++++++++++++++++++++++ 4 files changed, 80 insertions(+), 3 deletions(-) diff --git a/src/server/routers/deploy.ts b/src/server/routers/deploy.ts index 342d6f9..d29ac2b 100644 --- a/src/server/routers/deploy.ts +++ b/src/server/routers/deploy.ts @@ -10,6 +10,7 @@ import { decryptNodeConfig } from "@/server/services/config-crypto"; import { withAudit } from "@/server/middleware/audit"; import { writeAuditLog } from "@/server/services/audit"; import { fireEventAlert } from "@/server/services/event-alerts"; +import { wsRegistry } from "@/server/services/ws-registry"; export const deployRouter = router({ preview: protectedProcedure @@ -267,7 +268,24 @@ export const deployRouter = router({ throw new TRPCError({ code: "NOT_FOUND", message: "Pipeline not found" }); } - return undeployAgent(input.pipelineId); + const result = await undeployAgent(input.pipelineId); + + // Notify connected agents that config has changed + if (result.success) { + const nodes = await prisma.vectorNode.findMany({ + where: { environmentId: pipeline.environmentId }, + select: { id: true }, + }); + for (const node of nodes) { + wsRegistry.send(node.id, { + type: "config_changed", + pipelineId: input.pipelineId, + reason: "undeploy", + }); + } + } + + return result; }), environmentInfo: protectedProcedure diff --git a/src/server/routers/fleet.ts b/src/server/routers/fleet.ts index 0c4225c..9bcfad6 100644 --- a/src/server/routers/fleet.ts +++ b/src/server/routers/fleet.ts @@ -5,6 +5,7 @@ import { prisma } from "@/lib/prisma"; import { LogLevel } from "@/generated/prisma"; import { withAudit } from "@/server/middleware/audit"; import { checkDevAgentVersion } from "@/server/services/version-check"; +import { wsRegistry } from "@/server/services/ws-registry"; export const fleetRouter = router({ list: protectedProcedure @@ -279,7 +280,7 @@ export const fleetRouter = router({ checksum = `sha256:${freshChecksum}`; } - return prisma.vectorNode.update({ + const updated = await prisma.vectorNode.update({ where: { id: input.nodeId }, data: { pendingAction: { @@ -290,6 +291,17 @@ export const fleetRouter = router({ }, }, }); + + // Push action to agent via WebSocket (fallback: agent reads pendingAction on next poll) + wsRegistry.send(input.nodeId, { + type: "action", + action: "self_update", + targetVersion, + downloadUrl, + checksum, + }); + + return updated; }), updateLabels: protectedProcedure @@ -345,13 +357,21 @@ export const fleetRouter = router({ if (!node) { throw new TRPCError({ code: "NOT_FOUND", message: "Node not found" }); } - return prisma.vectorNode.update({ + const updated = await prisma.vectorNode.update({ where: { id: input.nodeId }, data: { maintenanceMode: input.enabled, maintenanceModeAt: input.enabled ? new Date() : null, }, }); + + // Maintenance mode changes what the config endpoint returns — notify agent to re-poll + wsRegistry.send(input.nodeId, { + type: "config_changed", + reason: input.enabled ? "maintenance_on" : "maintenance_off", + }); + + return updated; }), listWithPipelineStatus: protectedProcedure diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index 00078d9..d82f353 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -18,6 +18,7 @@ import { copyPipelineGraph } from "@/server/services/copy-pipeline-graph"; import { stripEnvRefs, type StrippedRef } from "@/server/services/strip-env-refs"; import { gitSyncDeletePipeline } from "@/server/services/git-sync"; import { evaluatePipelineHealth } from "@/server/services/sli-evaluator"; +import { wsRegistry } from "@/server/services/ws-registry"; /** Pipeline names must be safe identifiers */ const pipelineNameSchema = z @@ -1161,6 +1162,21 @@ export const pipelineRouter = router({ }, }); + // Push sample request to connected agents running this pipeline + const statuses = await prisma.nodePipelineStatus.findMany({ + where: { pipelineId: input.pipelineId, status: "RUNNING" }, + select: { nodeId: true }, + }); + for (const { nodeId } of statuses) { + wsRegistry.send(nodeId, { + type: "sample_request", + requestId: request.id, + pipelineId: input.pipelineId, + componentKeys: input.componentKeys, + limit: input.limit, + }); + } + return { requestId: request.id, status: "PENDING" }; }), diff --git a/src/server/services/deploy-agent.ts b/src/server/services/deploy-agent.ts index 12b2ebf..389ac52 100644 --- a/src/server/services/deploy-agent.ts +++ b/src/server/services/deploy-agent.ts @@ -6,6 +6,7 @@ import { createVersion } from "@/server/services/pipeline-version"; import { decryptNodeConfig } from "@/server/services/config-crypto"; import { startSystemVector, stopSystemVector } from "@/server/services/system-vector"; import { gitSyncCommitPipeline } from "@/server/services/git-sync"; +import { wsRegistry } from "@/server/services/ws-registry"; export interface AgentDeployResult { success: boolean; @@ -165,6 +166,28 @@ export async function deployAgent( await startSystemVector(version.configYaml); } + // Notify connected agents that config has changed — they will re-poll + // to get the full assembled config with secrets and certs resolved. + if (!pipeline.isSystem) { + const nodeSelector = pipeline.nodeSelector as Record | null; + const targetNodes = await prisma.vectorNode.findMany({ + where: { environmentId: pipeline.environmentId }, + select: { id: true, labels: true }, + }); + for (const node of targetNodes) { + const labels = (node.labels as Record) ?? {}; + const selectorEntries = Object.entries(nodeSelector ?? {}); + const matches = selectorEntries.every(([k, v]) => labels[k] === v); + if (matches) { + wsRegistry.send(node.id, { + type: "config_changed", + pipelineId, + reason: "deploy", + }); + } + } + } + return { success: true, versionId: version.id, From c99d03363127243ec9a43f4f1dd5a021e5813264 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:43:45 +0000 Subject: [PATCH 14/23] feat: add SendSampleResults method, NodeToken accessor, WebSocketURL field --- agent/internal/client/client.go | 41 +++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/agent/internal/client/client.go b/agent/internal/client/client.go index faabfb4..9fd3929 100644 --- a/agent/internal/client/client.go +++ b/agent/internal/client/client.go @@ -29,6 +29,11 @@ func (c *Client) SetNodeToken(token string) { c.nodeToken = token } +// NodeToken returns the current node token for use by other packages (e.g., WebSocket auth). +func (c *Client) NodeToken() string { + return c.nodeToken +} + // EnrollRequest is sent to POST /api/agent/enroll type EnrollRequest struct { Token string `json:"token"` @@ -114,6 +119,7 @@ type ConfigResponse struct { SecretBackendConfig map[string]interface{} `json:"secretBackendConfig,omitempty"` SampleRequests []SampleRequestMsg `json:"sampleRequests,omitempty"` PendingAction *PendingAction `json:"pendingAction,omitempty"` + WebSocketURL string `json:"websocketUrl,omitempty"` } func (c *Client) GetConfig() (*ConfigResponse, error) { @@ -257,3 +263,38 @@ func (c *Client) SendHeartbeat(req HeartbeatRequest) error { } return nil } + +// SampleResultsRequest is sent to POST /api/agent/samples +type SampleResultsRequest struct { + Results []SampleResultMsg `json:"results"` +} + +// SendSampleResults sends sample results directly to the dedicated samples endpoint. +func (c *Client) SendSampleResults(results []SampleResultMsg) error { + req := SampleResultsRequest{Results: results} + body, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("marshal sample results: %w", err) + } + + httpReq, err := http.NewRequest("POST", c.baseURL+"/api/agent/samples", bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create sample results request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", "Bearer "+c.nodeToken) + + slog.Debug("http request", "method", "POST", "url", c.baseURL+"/api/agent/samples") + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return fmt.Errorf("sample results request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("sample results failed (status %d): %s", resp.StatusCode, string(respBody)) + } + slog.Debug("http response", "method", "POST", "url", "/api/agent/samples", "status", 200) + return nil +} From 630b9ed5dc8a2d92a8a1bd9088cf50816319f751 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:45:52 +0000 Subject: [PATCH 15/23] feat: add poller mutex and wire WebSocket into agent main loop - Add sync.Mutex to poller for thread-safe access from WS handler - Store websocketUrl from config response - Add WebSocket client startup after first poll in Run() - Channel-based dispatch (wsCh) ensures all mutations on main goroutine - handleWsMessage: config_changed triggers re-poll, sample_request sends directly to /api/agent/samples, action triggers self-update - Debounced immediate heartbeat (1s window) after state changes --- agent/internal/agent/agent.go | 150 +++++++++++++++++++++++++++++++++ agent/internal/agent/poller.go | 18 ++++ 2 files changed, 168 insertions(+) diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index ac98d72..46c2861 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -16,6 +16,7 @@ import ( "github.com/TerrifiedBug/vectorflow/agent/internal/config" "github.com/TerrifiedBug/vectorflow/agent/internal/sampler" "github.com/TerrifiedBug/vectorflow/agent/internal/supervisor" + "github.com/TerrifiedBug/vectorflow/agent/internal/ws" ) var Version = "dev" @@ -32,6 +33,10 @@ type Agent struct { sampleResults []client.SampleResultMsg failedUpdateVersion string // skip retries for this version updateError string // report failure to server + + wsClient *ws.Client + wsCh chan ws.PushMessage + immediateHeartbeat *time.Timer } func New(cfg *config.Config) (*Agent, error) { @@ -83,18 +88,42 @@ func (a *Agent) Run() error { // Do first poll immediately a.pollAndApply() + + // Start WebSocket if the server provided a URL + a.wsCh = make(chan ws.PushMessage, 16) + if wsURL := a.poller.WebSocketURL(); wsURL != "" { + a.wsClient = ws.New(wsURL, a.client.NodeToken(), func(msg ws.PushMessage) { + // Forward messages to the main goroutine via channel + select { + case a.wsCh <- msg: + default: + slog.Warn("ws: message channel full, dropping message", "type", msg.Type) + } + }) + go a.wsClient.Connect() + slog.Info("websocket client started", "url", wsURL) + } + a.sendHeartbeat() for { select { case <-ctx.Done(): slog.Info("shutting down all pipelines") + if a.wsClient != nil { + a.wsClient.Close() + } + if a.immediateHeartbeat != nil { + a.immediateHeartbeat.Stop() + } a.supervisor.ShutdownAll() slog.Info("agent stopped") return nil case <-ticker.C: a.pollAndApply() a.sendHeartbeat() + case msg := <-a.wsCh: + a.handleWsMessage(msg) } } } @@ -259,3 +288,124 @@ func (a *Agent) processSampleRequests(requests []client.SampleRequestMsg) { } } } + +// handleWsMessage processes a push message from the WebSocket channel. +// MUST be called from the main goroutine (same goroutine as Run()'s select loop). +func (a *Agent) handleWsMessage(msg ws.PushMessage) { + switch msg.Type { + case "config_changed": + slog.Info("ws: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason) + // Re-poll immediately to get the full assembled config + a.pollAndApply() + a.triggerImmediateHeartbeat() + + case "sample_request": + slog.Info("ws: sample request received", "requestId", msg.RequestID, "pipeline", msg.PipelineID) + a.processSampleRequestsAndSend([]client.SampleRequestMsg{ + { + RequestID: msg.RequestID, + PipelineID: msg.PipelineID, + ComponentKeys: msg.ComponentKeys, + Limit: msg.Limit, + }, + }) + + case "action": + slog.Info("ws: action received", "action", msg.Action) + if msg.Action == "self_update" { + a.handlePendingAction(&client.PendingAction{ + Type: "self_update", + TargetVersion: msg.TargetVersion, + DownloadURL: msg.DownloadURL, + Checksum: msg.Checksum, + }) + a.triggerImmediateHeartbeat() + } + + case "poll_interval": + if msg.IntervalMs > 0 { + slog.Info("ws: poll interval changed", "intervalMs", msg.IntervalMs) + } + + default: + slog.Warn("ws: unknown message type", "type", msg.Type) + } +} + +// triggerImmediateHeartbeat sends a heartbeat soon, debounced to 1 second. +// Multiple calls within 1s collapse into a single heartbeat with the latest state. +// MUST be called from the main goroutine (same goroutine as Run()'s select loop). +func (a *Agent) triggerImmediateHeartbeat() { + if a.immediateHeartbeat != nil { + a.immediateHeartbeat.Stop() + } + a.immediateHeartbeat = time.AfterFunc(time.Second, func() { + a.sendHeartbeat() + }) +} + +// processSampleRequestsAndSend processes sample requests and sends results +// directly to the /api/agent/samples endpoint (used for WebSocket-triggered requests). +// Falls back to heartbeat delivery on HTTP failure. +func (a *Agent) processSampleRequestsAndSend(requests []client.SampleRequestMsg) { + statuses := a.supervisor.Statuses() + statusMap := make(map[string]supervisor.ProcessInfo, len(statuses)) + for _, s := range statuses { + statusMap[s.PipelineID] = s + } + + for _, req := range requests { + s, found := statusMap[req.PipelineID] + if !found || s.Status != "RUNNING" || s.APIPort == 0 { + errMsg := "pipeline not running" + if !found { + errMsg = "pipeline not found" + } else if s.APIPort == 0 { + errMsg = "pipeline API port not available" + } + results := make([]client.SampleResultMsg, 0, len(req.ComponentKeys)) + for _, key := range req.ComponentKeys { + results = append(results, client.SampleResultMsg{ + RequestID: req.RequestID, + ComponentKey: key, + Error: errMsg, + }) + } + if err := a.client.SendSampleResults(results); err != nil { + slog.Warn("failed to send sample error results via dedicated endpoint", "error", err) + a.mu.Lock() + a.sampleResults = append(a.sampleResults, results...) + a.mu.Unlock() + } + continue + } + + for _, key := range req.ComponentKeys { + go func(reqID string, apiPort int, componentKey string, limit int) { + result := sampler.Sample(a.cfg.VectorBin, apiPort, componentKey, limit) + result.RequestID = reqID + + msg := client.SampleResultMsg{ + RequestID: result.RequestID, + ComponentKey: result.ComponentKey, + Events: result.Events, + Error: result.Error, + } + for _, fi := range result.Schema { + msg.Schema = append(msg.Schema, client.FieldInfoMsg{ + Path: fi.Path, Type: fi.Type, Sample: fi.Sample, + }) + } + + if err := a.client.SendSampleResults([]client.SampleResultMsg{msg}); err != nil { + slog.Warn("failed to send sample results via dedicated endpoint, will retry in heartbeat", "error", err) + a.mu.Lock() + a.sampleResults = append(a.sampleResults, msg) + a.mu.Unlock() + } else { + slog.Debug("sample result sent via dedicated endpoint", "requestId", reqID, "component", componentKey) + } + }(req.RequestID, s.APIPort, key, req.Limit) + } + } +} diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index c4d5410..0731d90 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/TerrifiedBug/vectorflow/agent/internal/client" "github.com/TerrifiedBug/vectorflow/agent/internal/config" @@ -24,9 +25,11 @@ type pipelineState struct { type poller struct { cfg *config.Config client configFetcher + mu sync.Mutex known map[string]pipelineState // pipelineId -> last known state sampleRequests []client.SampleRequestMsg pendingAction *client.PendingAction + websocketUrl string } func newPoller(cfg *config.Config, c configFetcher) *poller { @@ -59,6 +62,9 @@ type PipelineAction struct { // Poll fetches config from VectorFlow and returns actions to take. func (p *poller) Poll() ([]PipelineAction, error) { + p.mu.Lock() + defer p.mu.Unlock() + resp, err := p.client.GetConfig() if err != nil { return nil, err @@ -184,6 +190,11 @@ func (p *poller) Poll() ([]PipelineAction, error) { // Store pending action (e.g. self-update) for the agent to handle p.pendingAction = resp.PendingAction + // Store websocket URL for the agent to use + if resp.WebSocketURL != "" { + p.websocketUrl = resp.WebSocketURL + } + return actions, nil } @@ -196,3 +207,10 @@ func (p *poller) SampleRequests() []client.SampleRequestMsg { func (p *poller) PendingAction() *client.PendingAction { return p.pendingAction } + +// WebSocketURL returns the WebSocket URL from the last config response. +func (p *poller) WebSocketURL() string { + p.mu.Lock() + defer p.mu.Unlock() + return p.websocketUrl +} From 06202f7ffff9b4380aa607dd652b0ea1e3d6d82b Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:47:24 +0000 Subject: [PATCH 16/23] feat: expose WebSocket status in fleet API and add WS badge to UI - Add wsConnected field to fleet list and listWithPipelineStatus responses - Show green "WS" badge next to node status when WebSocket is connected --- src/app/(dashboard)/fleet/page.tsx | 12 ++++++++++++ src/server/routers/fleet.ts | 11 +++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/app/(dashboard)/fleet/page.tsx b/src/app/(dashboard)/fleet/page.tsx index bcc404f..811ea6a 100644 --- a/src/app/(dashboard)/fleet/page.tsx +++ b/src/app/(dashboard)/fleet/page.tsx @@ -231,6 +231,18 @@ export default function FleetPage() { {nodeStatusLabel(node.status)} )} + {node.wsConnected && ( + + + + WS + + + + WebSocket connected — real-time push enabled + + + )} {formatLastSeen(node.lastSeen)} diff --git a/src/server/routers/fleet.ts b/src/server/routers/fleet.ts index 9bcfad6..2a82f09 100644 --- a/src/server/routers/fleet.ts +++ b/src/server/routers/fleet.ts @@ -12,13 +12,17 @@ export const fleetRouter = router({ .input(z.object({ environmentId: z.string() })) .use(withTeamAccess("VIEWER")) .query(async ({ input }) => { - return prisma.vectorNode.findMany({ + const nodes = await prisma.vectorNode.findMany({ where: { environmentId: input.environmentId }, include: { environment: { select: { id: true, name: true } }, }, orderBy: { createdAt: "desc" }, }); + return nodes.map((node) => ({ + ...node, + wsConnected: wsRegistry.isConnected(node.id), + })); }), get: protectedProcedure @@ -408,7 +412,10 @@ export const fleetRouter = router({ }); return { - nodes, + nodes: nodes.map((node) => ({ + ...node, + wsConnected: wsRegistry.isConnected(node.id), + })), deployedPipelines: deployedPipelines.map((p) => ({ id: p.id, name: p.name, From 4bc4c34643a32499943d221c73b8fab836ea31bd Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 20:53:34 +0000 Subject: [PATCH 17/23] fix: resolve data race in immediate heartbeat and poller mutex consistency - triggerImmediateHeartbeat now sends signal via channel instead of calling sendHeartbeat() directly from timer goroutine, ensuring all state access (including updateError) happens on the main goroutine - Add mutex locking to SampleRequests() and PendingAction() accessors for consistency with the thread-safety contract on poller --- agent/internal/agent/agent.go | 19 ++++++++++++++----- agent/internal/agent/poller.go | 4 ++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index 46c2861..d3dc108 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -34,9 +34,10 @@ type Agent struct { failedUpdateVersion string // skip retries for this version updateError string // report failure to server - wsClient *ws.Client - wsCh chan ws.PushMessage - immediateHeartbeat *time.Timer + wsClient *ws.Client + wsCh chan ws.PushMessage + immediateHeartbeat *time.Timer + immediateHeartbeatCh chan struct{} } func New(cfg *config.Config) (*Agent, error) { @@ -91,6 +92,7 @@ func (a *Agent) Run() error { // Start WebSocket if the server provided a URL a.wsCh = make(chan ws.PushMessage, 16) + a.immediateHeartbeatCh = make(chan struct{}, 1) if wsURL := a.poller.WebSocketURL(); wsURL != "" { a.wsClient = ws.New(wsURL, a.client.NodeToken(), func(msg ws.PushMessage) { // Forward messages to the main goroutine via channel @@ -124,6 +126,8 @@ func (a *Agent) Run() error { a.sendHeartbeat() case msg := <-a.wsCh: a.handleWsMessage(msg) + case <-a.immediateHeartbeatCh: + a.sendHeartbeat() } } } @@ -334,13 +338,18 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage) { // triggerImmediateHeartbeat sends a heartbeat soon, debounced to 1 second. // Multiple calls within 1s collapse into a single heartbeat with the latest state. -// MUST be called from the main goroutine (same goroutine as Run()'s select loop). +// The timer fires a signal back to the main goroutine's select loop, ensuring +// sendHeartbeat() always runs on the main goroutine (no data race on updateError). +// MUST be called from the main goroutine. func (a *Agent) triggerImmediateHeartbeat() { if a.immediateHeartbeat != nil { a.immediateHeartbeat.Stop() } a.immediateHeartbeat = time.AfterFunc(time.Second, func() { - a.sendHeartbeat() + select { + case a.immediateHeartbeatCh <- struct{}{}: + default: + } }) } diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index 0731d90..5f9426a 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -200,11 +200,15 @@ func (p *poller) Poll() ([]PipelineAction, error) { // SampleRequests returns the sample requests from the last poll response. func (p *poller) SampleRequests() []client.SampleRequestMsg { + p.mu.Lock() + defer p.mu.Unlock() return p.sampleRequests } // PendingAction returns the pending action from the last poll response, if any. func (p *poller) PendingAction() *client.PendingAction { + p.mu.Lock() + defer p.mu.Unlock() return p.pendingAction } From 583baf7a30ce0e284c5dc8b91d4c64f1c9f9bac3 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:04:35 +0000 Subject: [PATCH 18/23] fix: resolve CI errors and duplicate sample results - Add explicit type annotations in server.ts to fix TS7006 implicit any - Deduplicate EventSample records: skip if (requestId, componentKey) already exists, preventing duplicates when multiple agents run the same pipeline and submit results concurrently - Remove incorrect // indirect marker on gorilla/websocket in go.mod --- agent/go.mod | 2 +- server.ts | 6 +++--- src/app/api/agent/samples/route.ts | 9 +++++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/agent/go.mod b/agent/go.mod index 174bc3b..92a8edf 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -2,4 +2,4 @@ module github.com/TerrifiedBug/vectorflow/agent go 1.22 -require github.com/gorilla/websocket v1.5.3 // indirect +require github.com/gorilla/websocket v1.5.3 diff --git a/server.ts b/server.ts index 86e73c1..880bcf8 100644 --- a/server.ts +++ b/server.ts @@ -2,7 +2,7 @@ import { createServer, type IncomingMessage } from "http"; import type { Socket } from "net"; import next from "next"; import { parse } from "url"; -import { WebSocketServer } from "ws"; +import { type WebSocket, WebSocketServer } from "ws"; const dev = process.env.NODE_ENV !== "production"; const hostname = process.env.HOSTNAME || "0.0.0.0"; @@ -46,7 +46,7 @@ app.prepare().then(async () => { }); }); - wss.on("connection", (ws, _req, agent: { nodeId: string; environmentId: string }) => { + wss.on("connection", (ws: WebSocket, _req: IncomingMessage, agent: { nodeId: string; environmentId: string }) => { const { nodeId } = agent; console.log(`[ws] agent connected: ${nodeId}`); wsRegistry.register(nodeId, ws); @@ -85,7 +85,7 @@ app.prepare().then(async () => { wsRegistry.unregister(nodeId); }); - ws.on("error", (err) => { + ws.on("error", (err: Error) => { console.error(`[ws] error for agent ${nodeId}:`, err.message); clearInterval(pingInterval); if (pongTimer) clearTimeout(pongTimer); diff --git a/src/app/api/agent/samples/route.ts b/src/app/api/agent/samples/route.ts index b0a0e16..8510a55 100644 --- a/src/app/api/agent/samples/route.ts +++ b/src/app/api/agent/samples/route.ts @@ -50,6 +50,15 @@ export async function POST(request: Request) { continue; } + // Check if this component key already has a sample for this request + // (multiple agents may submit results for the same pipeline — first one wins) + const existing = await prisma.eventSample.findFirst({ + where: { requestId: result.requestId, componentKey: result.componentKey }, + }); + if (existing) { + continue; + } + if (result.error) { await prisma.eventSampleRequest.update({ where: { id: result.requestId }, From f5548fe7585f51bf902d160c4df3f0d9c8560194 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:27:16 +0000 Subject: [PATCH 19/23] fix: reconnection race, auth performance, and sample dedup - ws-registry: unregister() now takes optional ws param and only removes if it matches the current socket, preventing stale close handlers from evicting a newer reconnection - ws-auth: add in-memory token cache so reconnects do O(1) lookup instead of O(n) bcrypt scan across all nodes - samples endpoint: replace findFirst TOCTOU check with unique constraint on (requestId, componentKey) + catch P2002 on create for race-safe dedup - Add migration for EventSample unique constraint --- .../migration.sql | 2 + prisma/schema.prisma | 1 + server.ts | 4 +- src/app/api/agent/samples/route.ts | 63 ++++++++++--------- src/server/services/ws-auth.ts | 31 +++++++-- src/server/services/ws-registry.ts | 9 ++- 6 files changed, 75 insertions(+), 35 deletions(-) create mode 100644 prisma/migrations/20260311010000_add_event_sample_unique_constraint/migration.sql diff --git a/prisma/migrations/20260311010000_add_event_sample_unique_constraint/migration.sql b/prisma/migrations/20260311010000_add_event_sample_unique_constraint/migration.sql new file mode 100644 index 0000000..dabc5d9 --- /dev/null +++ b/prisma/migrations/20260311010000_add_event_sample_unique_constraint/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE UNIQUE INDEX "EventSample_requestId_componentKey_key" ON "EventSample"("requestId", "componentKey"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 50e3076..b529d5f 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -364,6 +364,7 @@ model EventSample { error String? sampledAt DateTime @default(now()) + @@unique([requestId, componentKey]) @@index([pipelineId, componentKey]) } diff --git a/server.ts b/server.ts index 880bcf8..5f401d3 100644 --- a/server.ts +++ b/server.ts @@ -82,14 +82,14 @@ app.prepare().then(async () => { console.log(`[ws] agent disconnected: ${nodeId}`); clearInterval(pingInterval); if (pongTimer) clearTimeout(pongTimer); - wsRegistry.unregister(nodeId); + wsRegistry.unregister(nodeId, ws); }); ws.on("error", (err: Error) => { console.error(`[ws] error for agent ${nodeId}:`, err.message); clearInterval(pingInterval); if (pongTimer) clearTimeout(pongTimer); - wsRegistry.unregister(nodeId); + wsRegistry.unregister(nodeId, ws); }); }); diff --git a/src/app/api/agent/samples/route.ts b/src/app/api/agent/samples/route.ts index 8510a55..256da62 100644 --- a/src/app/api/agent/samples/route.ts +++ b/src/app/api/agent/samples/route.ts @@ -1,4 +1,5 @@ import { NextResponse } from "next/server"; +import { Prisma } from "@/generated/prisma"; import { prisma } from "@/lib/prisma"; import { authenticateAgent } from "@/server/services/agent-auth"; import { z } from "zod"; @@ -24,6 +25,11 @@ const sampleResultSchema = z.object({ ), }); +/** Returns true if this is a Prisma unique constraint violation (P2002). */ +function isUniqueViolation(err: unknown): boolean { + return err instanceof Prisma.PrismaClientKnownRequestError && err.code === "P2002"; +} + export async function POST(request: Request) { const agent = await authenticateAgent(request); if (!agent) { @@ -50,40 +56,41 @@ export async function POST(request: Request) { continue; } - // Check if this component key already has a sample for this request - // (multiple agents may submit results for the same pipeline — first one wins) - const existing = await prisma.eventSample.findFirst({ - where: { requestId: result.requestId, componentKey: result.componentKey }, - }); - if (existing) { - continue; - } - if (result.error) { await prisma.eventSampleRequest.update({ where: { id: result.requestId }, data: { status: "ERROR", completedAt: new Date(), nodeId: agent.nodeId }, }); - await prisma.eventSample.create({ - data: { - requestId: result.requestId, - pipelineId: sampleRequest.pipelineId, - componentKey: result.componentKey, - events: [], - schema: [], - error: result.error, - }, - }); + try { + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: sampleRequest.pipelineId, + componentKey: result.componentKey, + events: [], + schema: [], + error: result.error, + }, + }); + } catch (err) { + if (isUniqueViolation(err)) continue; // another agent already submitted + throw err; + } } else { - await prisma.eventSample.create({ - data: { - requestId: result.requestId, - pipelineId: sampleRequest.pipelineId, - componentKey: result.componentKey, - events: result.events as object[], - schema: result.schema, - }, - }); + try { + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: sampleRequest.pipelineId, + componentKey: result.componentKey, + events: result.events as object[], + schema: result.schema, + }, + }); + } catch (err) { + if (isUniqueViolation(err)) continue; // another agent already submitted + throw err; + } const componentKeys = sampleRequest.componentKeys as string[]; const completedKeys = await prisma.eventSample.findMany({ diff --git a/src/server/services/ws-auth.ts b/src/server/services/ws-auth.ts index d9cfe1a..947c643 100644 --- a/src/server/services/ws-auth.ts +++ b/src/server/services/ws-auth.ts @@ -2,11 +2,16 @@ import type { IncomingMessage } from "http"; import { prisma } from "@/lib/prisma"; import { extractBearerToken, verifyNodeToken } from "./agent-token"; +/** Cache verified tokens to avoid O(n) bcrypt scan on every WS upgrade. + * Key: plaintext token, Value: { nodeId, environmentId }. + * Entries are evicted when the token fails verification (node re-enrolled). */ +const tokenCache = new Map(); + /** - * Authenticate a WebSocket upgrade request by verifying its Bearer token - * against all node tokens. + * Authenticate a WebSocket upgrade request by verifying its Bearer token. * - * Returns the matching node and environment IDs, or null if authentication fails. + * Uses an in-memory cache so reconnects (same token) are O(1) instead of + * scanning all node hashes with bcrypt. */ export async function authenticateWsUpgrade( req: IncomingMessage, @@ -19,6 +24,22 @@ export async function authenticateWsUpgrade( return null; } + // Fast path: check cache first (O(1) string lookup) + const cached = tokenCache.get(token); + if (cached) { + // Verify the node still exists and the hash still matches (re-enrollment invalidates) + const node = await prisma.vectorNode.findUnique({ + where: { id: cached.nodeId }, + select: { nodeTokenHash: true }, + }); + if (node?.nodeTokenHash && await verifyNodeToken(token, node.nodeTokenHash)) { + return cached; + } + // Cache stale — node deleted or re-enrolled + tokenCache.delete(token); + } + + // Slow path: scan all nodes with bcrypt const nodes = await prisma.vectorNode.findMany({ where: { nodeTokenHash: { not: null } }, select: { id: true, environmentId: true, nodeTokenHash: true }, @@ -28,7 +49,9 @@ export async function authenticateWsUpgrade( if (!node.nodeTokenHash) continue; const valid = await verifyNodeToken(token, node.nodeTokenHash); if (valid) { - return { nodeId: node.id, environmentId: node.environmentId }; + const result = { nodeId: node.id, environmentId: node.environmentId }; + tokenCache.set(token, result); + return result; } } diff --git a/src/server/services/ws-registry.ts b/src/server/services/ws-registry.ts index b20fd69..2c60e5b 100644 --- a/src/server/services/ws-registry.ts +++ b/src/server/services/ws-registry.ts @@ -12,7 +12,14 @@ class WsRegistry { this.connections.set(nodeId, ws); } - unregister(nodeId: string): void { + /** Remove a connection. If `ws` is provided, only remove if it matches the + * current registered socket — prevents a stale close handler from removing + * a newer reconnection. */ + unregister(nodeId: string, ws?: WebSocket): void { + if (ws) { + const current = this.connections.get(nodeId); + if (current !== ws) return; // stale socket — newer connection already registered + } this.connections.delete(nodeId); } From ccbe6ad0936ebf115e14583630806e95499bbd52 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:15:35 +0000 Subject: [PATCH 20/23] fix: address final Greptile review findings - Move EventSampleRequest ERROR status update after EventSample.create to prevent permanent ERROR state when another agent already submitted - Replace plaintext token cache keys with SHA-256 hashes to avoid storing credentials in process memory - Wire poll_interval WS message to actually reset the poll ticker --- agent/internal/agent/agent.go | 6 ++++-- src/app/api/agent/samples/route.ts | 10 ++++++---- src/server/services/ws-auth.ts | 16 ++++++++++++---- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index d3dc108..8cea003 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -125,7 +125,7 @@ func (a *Agent) Run() error { a.pollAndApply() a.sendHeartbeat() case msg := <-a.wsCh: - a.handleWsMessage(msg) + a.handleWsMessage(msg, ticker) case <-a.immediateHeartbeatCh: a.sendHeartbeat() } @@ -295,7 +295,7 @@ func (a *Agent) processSampleRequests(requests []client.SampleRequestMsg) { // handleWsMessage processes a push message from the WebSocket channel. // MUST be called from the main goroutine (same goroutine as Run()'s select loop). -func (a *Agent) handleWsMessage(msg ws.PushMessage) { +func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) { switch msg.Type { case "config_changed": slog.Info("ws: config changed notification", "pipeline", msg.PipelineID, "reason", msg.Reason) @@ -328,6 +328,8 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage) { case "poll_interval": if msg.IntervalMs > 0 { + newInterval := time.Duration(msg.IntervalMs) * time.Millisecond + ticker.Reset(newInterval) slog.Info("ws: poll interval changed", "intervalMs", msg.IntervalMs) } diff --git a/src/app/api/agent/samples/route.ts b/src/app/api/agent/samples/route.ts index 256da62..eedcfb6 100644 --- a/src/app/api/agent/samples/route.ts +++ b/src/app/api/agent/samples/route.ts @@ -57,10 +57,6 @@ export async function POST(request: Request) { } if (result.error) { - await prisma.eventSampleRequest.update({ - where: { id: result.requestId }, - data: { status: "ERROR", completedAt: new Date(), nodeId: agent.nodeId }, - }); try { await prisma.eventSample.create({ data: { @@ -76,6 +72,12 @@ export async function POST(request: Request) { if (isUniqueViolation(err)) continue; // another agent already submitted throw err; } + // Update status AFTER successful EventSample write — if another agent + // already submitted a success, the unique constraint above skips this. + await prisma.eventSampleRequest.update({ + where: { id: result.requestId }, + data: { status: "ERROR", completedAt: new Date(), nodeId: agent.nodeId }, + }); } else { try { await prisma.eventSample.create({ diff --git a/src/server/services/ws-auth.ts b/src/server/services/ws-auth.ts index 947c643..6ca52c2 100644 --- a/src/server/services/ws-auth.ts +++ b/src/server/services/ws-auth.ts @@ -1,9 +1,16 @@ import type { IncomingMessage } from "http"; +import { createHash } from "crypto"; import { prisma } from "@/lib/prisma"; import { extractBearerToken, verifyNodeToken } from "./agent-token"; +/** Fast, non-reversible hash used as cache key instead of the raw plaintext + * token — avoids keeping credentials in process memory. */ +function tokenCacheKey(token: string): string { + return createHash("sha256").update(token).digest("hex"); +} + /** Cache verified tokens to avoid O(n) bcrypt scan on every WS upgrade. - * Key: plaintext token, Value: { nodeId, environmentId }. + * Key: SHA-256 hash of token, Value: { nodeId, environmentId }. * Entries are evicted when the token fails verification (node re-enrolled). */ const tokenCache = new Map(); @@ -25,7 +32,8 @@ export async function authenticateWsUpgrade( } // Fast path: check cache first (O(1) string lookup) - const cached = tokenCache.get(token); + const cacheKey = tokenCacheKey(token); + const cached = tokenCache.get(cacheKey); if (cached) { // Verify the node still exists and the hash still matches (re-enrollment invalidates) const node = await prisma.vectorNode.findUnique({ @@ -36,7 +44,7 @@ export async function authenticateWsUpgrade( return cached; } // Cache stale — node deleted or re-enrolled - tokenCache.delete(token); + tokenCache.delete(cacheKey); } // Slow path: scan all nodes with bcrypt @@ -50,7 +58,7 @@ export async function authenticateWsUpgrade( const valid = await verifyNodeToken(token, node.nodeTokenHash); if (valid) { const result = { nodeId: node.id, environmentId: node.environmentId }; - tokenCache.set(token, result); + tokenCache.set(cacheKey, result); return result; } } From da937d6e9b62b43211dd9df5c65f1bd047621306 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:28:19 +0000 Subject: [PATCH 21/23] fix: cross-environment auth and status race in samples endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Verify sample request's pipeline belongs to the agent's environment before allowing writes (prevents cross-environment data injection) - Use updateMany with status: "PENDING" filter for atomic status transitions — concurrent success/error submissions cannot clobber each other's final status --- src/app/api/agent/samples/route.ts | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/app/api/agent/samples/route.ts b/src/app/api/agent/samples/route.ts index eedcfb6..ac9f134 100644 --- a/src/app/api/agent/samples/route.ts +++ b/src/app/api/agent/samples/route.ts @@ -51,11 +51,17 @@ export async function POST(request: Request) { for (const result of results) { const sampleRequest = await prisma.eventSampleRequest.findUnique({ where: { id: result.requestId }, + include: { pipeline: { select: { environmentId: true } } }, }); if (!sampleRequest || sampleRequest.status !== "PENDING") { continue; } + // Verify the request's pipeline belongs to this agent's environment + if (sampleRequest.pipeline.environmentId !== agent.environmentId) { + continue; + } + if (result.error) { try { await prisma.eventSample.create({ @@ -72,10 +78,10 @@ export async function POST(request: Request) { if (isUniqueViolation(err)) continue; // another agent already submitted throw err; } - // Update status AFTER successful EventSample write — if another agent - // already submitted a success, the unique constraint above skips this. - await prisma.eventSampleRequest.update({ - where: { id: result.requestId }, + // Atomically transition PENDING → ERROR; no-op if another agent already + // moved the status (e.g. to COMPLETED), preventing status clobbering. + await prisma.eventSampleRequest.updateMany({ + where: { id: result.requestId, status: "PENDING" }, data: { status: "ERROR", completedAt: new Date(), nodeId: agent.nodeId }, }); } else { @@ -103,8 +109,10 @@ export async function POST(request: Request) { const allDone = componentKeys.every((k) => completedKeySet.has(k)); if (allDone) { - await prisma.eventSampleRequest.update({ - where: { id: result.requestId }, + // Atomically transition PENDING → COMPLETED; no-op if already moved + // (e.g. an error submission raced and set ERROR first). + await prisma.eventSampleRequest.updateMany({ + where: { id: result.requestId, status: "PENDING" }, data: { status: "COMPLETED", completedAt: new Date(), nodeId: agent.nodeId }, }); } From ce90f8a9ade24bdfb247d9de3e8ec398a32ef8f1 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:37:41 +0000 Subject: [PATCH 22/23] fix: unify sample status logic to prevent COMPLETED/ERROR race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Merge success and error paths into a single flow: write EventSample, then check if all components have samples. Final status is determined by whether any samples have errors — COMPLETED if all succeeded, ERROR if any failed. The atomic updateMany with status: "PENDING" filter ensures only one writer transitions the request. --- src/app/api/agent/samples/route.ts | 85 ++++++++++++------------------ 1 file changed, 34 insertions(+), 51 deletions(-) diff --git a/src/app/api/agent/samples/route.ts b/src/app/api/agent/samples/route.ts index ac9f134..3b173ae 100644 --- a/src/app/api/agent/samples/route.ts +++ b/src/app/api/agent/samples/route.ts @@ -62,60 +62,43 @@ export async function POST(request: Request) { continue; } - if (result.error) { - try { - await prisma.eventSample.create({ - data: { - requestId: result.requestId, - pipelineId: sampleRequest.pipelineId, - componentKey: result.componentKey, - events: [], - schema: [], - error: result.error, - }, - }); - } catch (err) { - if (isUniqueViolation(err)) continue; // another agent already submitted - throw err; - } - // Atomically transition PENDING → ERROR; no-op if another agent already - // moved the status (e.g. to COMPLETED), preventing status clobbering. - await prisma.eventSampleRequest.updateMany({ - where: { id: result.requestId, status: "PENDING" }, - data: { status: "ERROR", completedAt: new Date(), nodeId: agent.nodeId }, + // Write the EventSample (success or error) + try { + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: sampleRequest.pipelineId, + componentKey: result.componentKey, + events: result.error ? [] : (result.events as object[]), + schema: result.error ? [] : result.schema, + error: result.error ?? null, + }, }); - } else { - try { - await prisma.eventSample.create({ - data: { - requestId: result.requestId, - pipelineId: sampleRequest.pipelineId, - componentKey: result.componentKey, - events: result.events as object[], - schema: result.schema, - }, - }); - } catch (err) { - if (isUniqueViolation(err)) continue; // another agent already submitted - throw err; - } + } catch (err) { + if (isUniqueViolation(err)) continue; // another agent already submitted + throw err; + } - const componentKeys = sampleRequest.componentKeys as string[]; - const completedKeys = await prisma.eventSample.findMany({ - where: { requestId: result.requestId }, - select: { componentKey: true }, - }); - const completedKeySet = new Set(completedKeys.map((s) => s.componentKey)); - const allDone = componentKeys.every((k) => completedKeySet.has(k)); + // Check if all components now have samples (success or error) + const componentKeys = sampleRequest.componentKeys as string[]; + const samples = await prisma.eventSample.findMany({ + where: { requestId: result.requestId }, + select: { componentKey: true, error: true }, + }); + const sampledKeySet = new Set(samples.map((s) => s.componentKey)); + const allDone = componentKeys.every((k) => sampledKeySet.has(k)); - if (allDone) { - // Atomically transition PENDING → COMPLETED; no-op if already moved - // (e.g. an error submission raced and set ERROR first). - await prisma.eventSampleRequest.updateMany({ - where: { id: result.requestId, status: "PENDING" }, - data: { status: "COMPLETED", completedAt: new Date(), nodeId: agent.nodeId }, - }); - } + if (allDone) { + const hasErrors = samples.some((s) => s.error != null); + // Atomically transition PENDING → final status; no-op if already moved. + await prisma.eventSampleRequest.updateMany({ + where: { id: result.requestId, status: "PENDING" }, + data: { + status: hasErrors ? "ERROR" : "COMPLETED", + completedAt: new Date(), + nodeId: agent.nodeId, + }, + }); } } From dbf1379e1ad4f0c665a82ec9265be65dfe75bb6a Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:51:34 +0000 Subject: [PATCH 23/23] fix: handle restart action and add token cache TTL/size bounds - Log warning and trigger re-poll for unimplemented "restart" WS action instead of silently discarding it - Add 30-minute TTL and 1000-entry cap to tokenCache to prevent unbounded memory growth from re-enrolled nodes --- agent/internal/agent/agent.go | 9 +++++- src/server/services/ws-auth.ts | 59 +++++++++++++++++++++++++--------- 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index 8cea003..cde2066 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -316,7 +316,8 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) { case "action": slog.Info("ws: action received", "action", msg.Action) - if msg.Action == "self_update" { + switch msg.Action { + case "self_update": a.handlePendingAction(&client.PendingAction{ Type: "self_update", TargetVersion: msg.TargetVersion, @@ -324,6 +325,12 @@ func (a *Agent) handleWsMessage(msg ws.PushMessage, ticker *time.Ticker) { Checksum: msg.Checksum, }) a.triggerImmediateHeartbeat() + case "restart": + slog.Warn("ws: restart action not yet implemented, triggering re-poll instead") + a.pollAndApply() + a.triggerImmediateHeartbeat() + default: + slog.Warn("ws: unknown action", "action", msg.Action) } case "poll_interval": diff --git a/src/server/services/ws-auth.ts b/src/server/services/ws-auth.ts index 6ca52c2..ac6c678 100644 --- a/src/server/services/ws-auth.ts +++ b/src/server/services/ws-auth.ts @@ -9,10 +9,30 @@ function tokenCacheKey(token: string): string { return createHash("sha256").update(token).digest("hex"); } +const TOKEN_CACHE_TTL_MS = 30 * 60 * 1000; // 30 minutes +const TOKEN_CACHE_MAX_SIZE = 1000; + +interface CacheEntry { + nodeId: string; + environmentId: string; + cachedAt: number; +} + /** Cache verified tokens to avoid O(n) bcrypt scan on every WS upgrade. - * Key: SHA-256 hash of token, Value: { nodeId, environmentId }. - * Entries are evicted when the token fails verification (node re-enrolled). */ -const tokenCache = new Map(); + * Key: SHA-256 hash of token, Value: { nodeId, environmentId, cachedAt }. + * Entries expire after 30 minutes and the cache is capped at 1000 entries. */ +const tokenCache = new Map(); + +/** Remove expired entries. Called on each lookup to bound memory. */ +function evictStale(): void { + if (tokenCache.size <= TOKEN_CACHE_MAX_SIZE) return; + const now = Date.now(); + for (const [key, entry] of tokenCache) { + if (now - entry.cachedAt > TOKEN_CACHE_TTL_MS) { + tokenCache.delete(key); + } + } +} /** * Authenticate a WebSocket upgrade request by verifying its Bearer token. @@ -35,16 +55,21 @@ export async function authenticateWsUpgrade( const cacheKey = tokenCacheKey(token); const cached = tokenCache.get(cacheKey); if (cached) { - // Verify the node still exists and the hash still matches (re-enrollment invalidates) - const node = await prisma.vectorNode.findUnique({ - where: { id: cached.nodeId }, - select: { nodeTokenHash: true }, - }); - if (node?.nodeTokenHash && await verifyNodeToken(token, node.nodeTokenHash)) { - return cached; + // Evict if TTL expired + if (Date.now() - cached.cachedAt > TOKEN_CACHE_TTL_MS) { + tokenCache.delete(cacheKey); + } else { + // Verify the node still exists and the hash still matches (re-enrollment invalidates) + const node = await prisma.vectorNode.findUnique({ + where: { id: cached.nodeId }, + select: { nodeTokenHash: true }, + }); + if (node?.nodeTokenHash && await verifyNodeToken(token, node.nodeTokenHash)) { + return { nodeId: cached.nodeId, environmentId: cached.environmentId }; + } + // Cache stale — node deleted or re-enrolled + tokenCache.delete(cacheKey); } - // Cache stale — node deleted or re-enrolled - tokenCache.delete(cacheKey); } // Slow path: scan all nodes with bcrypt @@ -57,9 +82,13 @@ export async function authenticateWsUpgrade( if (!node.nodeTokenHash) continue; const valid = await verifyNodeToken(token, node.nodeTokenHash); if (valid) { - const result = { nodeId: node.id, environmentId: node.environmentId }; - tokenCache.set(cacheKey, result); - return result; + evictStale(); + tokenCache.set(cacheKey, { + nodeId: node.id, + environmentId: node.environmentId, + cachedAt: Date.now(), + }); + return { nodeId: node.id, environmentId: node.environmentId }; } }