From 740abc52b3224adfd7341b5ca6868cb537c675ad Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:13:40 +0000 Subject: [PATCH 01/25] feat: add TypeScript types for structured AI suggestions --- src/lib/ai/types.ts | 55 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/lib/ai/types.ts diff --git a/src/lib/ai/types.ts b/src/lib/ai/types.ts new file mode 100644 index 0000000..b267abf --- /dev/null +++ b/src/lib/ai/types.ts @@ -0,0 +1,55 @@ +// src/lib/ai/types.ts + +export interface AiSuggestionBase { + id: string; + title: string; + description: string; + priority: "high" | "medium" | "low"; +} + +export interface ModifyConfigSuggestion { + type: "modify_config"; + componentKey: string; + changes: Record; +} + +export interface AddComponentSuggestion { + type: "add_component"; + component: { + key: string; + componentType: string; + kind: "source" | "transform" | "sink"; + config: Record; + }; + insertAfter: string; + connectTo: string[]; +} + +export interface RemoveComponentSuggestion { + type: "remove_component"; + componentKey: string; + reconnect: boolean; +} + +export interface ModifyConnectionsSuggestion { + type: "modify_connections"; + edgeChanges: Array<{ + action: "add" | "remove"; + from: string; + to: string; + }>; +} + +export type AiSuggestion = + | (AiSuggestionBase & ModifyConfigSuggestion) + | (AiSuggestionBase & AddComponentSuggestion) + | (AiSuggestionBase & RemoveComponentSuggestion) + | (AiSuggestionBase & ModifyConnectionsSuggestion); + +export interface AiReviewResponse { + summary: string; + suggestions: AiSuggestion[]; +} + +/** State of a suggestion in the UI */ +export type SuggestionStatus = "actionable" | "applied" | "outdated" | "invalid"; From 9ce60a26fd3c990b1e5cd20aa6e56b4c61b4837d Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:15:13 +0000 Subject: [PATCH 02/25] feat: add AiConversation and AiMessage models for persistent AI review threads --- .../migration.sql | 42 +++++++++++++++++++ prisma/schema.prisma | 31 ++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 prisma/migrations/20260310030000_add_ai_conversations/migration.sql diff --git a/prisma/migrations/20260310030000_add_ai_conversations/migration.sql b/prisma/migrations/20260310030000_add_ai_conversations/migration.sql new file mode 100644 index 0000000..77d5f75 --- /dev/null +++ b/prisma/migrations/20260310030000_add_ai_conversations/migration.sql @@ -0,0 +1,42 @@ +-- CreateTable +CREATE TABLE "AiConversation" ( + "id" TEXT NOT NULL, + "pipelineId" TEXT NOT NULL, + "createdById" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "AiConversation_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "AiMessage" ( + "id" TEXT NOT NULL, + "conversationId" TEXT NOT NULL, + "role" TEXT NOT NULL, + "content" TEXT NOT NULL, + "suggestions" JSONB, + "pipelineYaml" TEXT, + "createdById" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "AiMessage_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "AiConversation_pipelineId_createdAt_idx" ON "AiConversation"("pipelineId", "createdAt"); + +-- CreateIndex +CREATE INDEX "AiMessage_conversationId_createdAt_idx" ON "AiMessage"("conversationId", "createdAt"); + +-- AddForeignKey +ALTER TABLE "AiConversation" ADD CONSTRAINT "AiConversation_pipelineId_fkey" FOREIGN KEY ("pipelineId") REFERENCES "Pipeline"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "AiConversation" ADD CONSTRAINT "AiConversation_createdById_fkey" FOREIGN KEY ("createdById") REFERENCES "User"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "AiMessage" ADD CONSTRAINT "AiMessage_conversationId_fkey" FOREIGN KEY ("conversationId") REFERENCES "AiConversation"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "AiMessage" ADD CONSTRAINT "AiMessage_createdById_fkey" FOREIGN KEY ("createdById") REFERENCES "User"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 50e3076..46679e8 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -35,6 +35,8 @@ model User { deployRequestsReviewed DeployRequest[] @relation("deployReviewer") deployRequestsExecuted DeployRequest[] @relation("deployExecutor") preferences UserPreference[] + aiConversationsCreated AiConversation[] @relation("AiConversationCreatedBy") + aiMessagesCreated AiMessage[] @relation("AiMessageCreatedBy") createdAt DateTime @default(now()) } @@ -277,6 +279,7 @@ model Pipeline { slis PipelineSli[] enrichMetadata Boolean @default(false) tags Json? @default("[]") // string[] of classification tags like ["PII", "PCI-DSS"] + aiConversations AiConversation[] deployRequests DeployRequest[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -752,3 +755,31 @@ model ServiceAccount { @@index([hashedKey]) @@index([environmentId]) } + +model AiConversation { + id String @id @default(cuid()) + pipelineId String + pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade) + createdById String? + createdBy User? @relation("AiConversationCreatedBy", fields: [createdById], references: [id], onDelete: SetNull) + messages AiMessage[] + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([pipelineId, createdAt]) +} + +model AiMessage { + id String @id @default(cuid()) + conversationId String + conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) + role String // "user" | "assistant" + content String + suggestions Json? + pipelineYaml String? + createdById String? + createdBy User? @relation("AiMessageCreatedBy", fields: [createdById], references: [id], onDelete: SetNull) + createdAt DateTime @default(now()) + + @@index([conversationId, createdAt]) +} From 4c8e11611ac18486198f50a06581c7f4aaefe82d Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:16:17 +0000 Subject: [PATCH 03/25] feat: add pure functions for applying AI suggestions to flow state --- src/lib/ai/suggestion-applier.ts | 190 +++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 src/lib/ai/suggestion-applier.ts diff --git a/src/lib/ai/suggestion-applier.ts b/src/lib/ai/suggestion-applier.ts new file mode 100644 index 0000000..257bf79 --- /dev/null +++ b/src/lib/ai/suggestion-applier.ts @@ -0,0 +1,190 @@ +// src/lib/ai/suggestion-applier.ts +import type { Node, Edge } from "@xyflow/react"; +import type { AiSuggestion } from "./types"; +import { findComponentDef } from "@/lib/vector/catalog"; +import { generateComponentKey } from "@/lib/component-key"; +import { generateId } from "@/lib/utils"; + +interface ApplyResult { + nodes: Node[]; + edges: Edge[]; + error?: string; +} + +/** + * Apply a single suggestion to the current flow state. + * Returns new nodes/edges arrays (immutable). + */ +export function applySuggestion( + suggestion: AiSuggestion, + nodes: Node[], + edges: Edge[], +): ApplyResult { + switch (suggestion.type) { + case "modify_config": + return applyModifyConfig(suggestion, nodes, edges); + case "add_component": + return applyAddComponent(suggestion, nodes, edges); + case "remove_component": + return applyRemoveComponent(suggestion, nodes, edges); + case "modify_connections": + return applyModifyConnections(suggestion, nodes, edges); + default: + return { nodes, edges, error: "Unknown suggestion type" }; + } +} + +function findNodeByComponentKey(nodes: Node[], componentKey: string): Node | undefined { + return nodes.find((n) => (n.data as Record).componentKey === componentKey); +} + +function applyModifyConfig( + suggestion: AiSuggestion & { type: "modify_config" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + const target = findNodeByComponentKey(nodes, suggestion.componentKey); + if (!target) { + return { nodes, edges, error: `Component "${suggestion.componentKey}" not found` }; + } + + const existingConfig = (target.data as Record).config as Record; + const newConfig = { ...existingConfig, ...suggestion.changes }; + + const newNodes = nodes.map((n) => + n.id === target.id + ? { ...n, data: { ...n.data, config: newConfig } } + : n, + ); + + return { nodes: newNodes, edges }; +} + +function applyAddComponent( + suggestion: AiSuggestion & { type: "add_component" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + const { component, insertAfter, connectTo } = suggestion; + + const componentDef = findComponentDef(component.componentType, component.kind); + if (!componentDef) { + return { nodes, edges, error: `Unknown component type "${component.componentType}"` }; + } + + const afterNode = findNodeByComponentKey(nodes, insertAfter); + if (!afterNode) { + return { nodes, edges, error: `Component "${insertAfter}" not found for insertAfter` }; + } + + const position = { + x: afterNode.position.x, + y: afterNode.position.y + 150, + }; + + const newNodeId = generateId(); + const newComponentKey = generateComponentKey(component.componentType); + + const newNode: Node = { + id: newNodeId, + type: component.kind, + position, + data: { + componentDef, + componentKey: newComponentKey, + displayName: componentDef.displayName, + config: component.config, + }, + }; + + let newEdges = [...edges]; + + // Add edge: afterNode to newNode (once, regardless of connectTo count) + newEdges.push({ id: generateId(), source: afterNode.id, target: newNodeId }); + + for (const downstreamKey of connectTo) { + const downstreamNode = findNodeByComponentKey(nodes, downstreamKey); + if (!downstreamNode) continue; + + // Remove existing edge from afterNode to downstream + newEdges = newEdges.filter( + (e) => !(e.source === afterNode.id && e.target === downstreamNode.id), + ); + + // Add edge: newNode to downstream + newEdges.push({ id: generateId(), source: newNodeId, target: downstreamNode.id }); + } + + return { nodes: [...nodes, newNode], edges: newEdges }; +} + +function applyRemoveComponent( + suggestion: AiSuggestion & { type: "remove_component" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + const target = findNodeByComponentKey(nodes, suggestion.componentKey); + if (!target) { + return { nodes, edges, error: `Component "${suggestion.componentKey}" not found` }; + } + + if ((target.data as Record).isSystemLocked) { + return { nodes, edges, error: `Component "${suggestion.componentKey}" is system-locked` }; + } + + const incomingEdges = edges.filter((e) => e.target === target.id); + const outgoingEdges = edges.filter((e) => e.source === target.id); + + let newEdges = edges.filter((e) => e.source !== target.id && e.target !== target.id); + const newNodes = nodes.filter((n) => n.id !== target.id); + + if (suggestion.reconnect) { + for (const incoming of incomingEdges) { + for (const outgoing of outgoingEdges) { + newEdges.push({ + id: generateId(), + source: incoming.source, + target: outgoing.target, + }); + } + } + } + + return { nodes: newNodes, edges: newEdges }; +} + +function applyModifyConnections( + suggestion: AiSuggestion & { type: "modify_connections" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + let newEdges = [...edges]; + + for (const change of suggestion.edgeChanges) { + const fromNode = findNodeByComponentKey(nodes, change.from); + const toNode = findNodeByComponentKey(nodes, change.to); + + if (!fromNode || !toNode) { + return { + nodes, + edges, + error: `Component "${!fromNode ? change.from : change.to}" not found`, + }; + } + + if (change.action === "add") { + const exists = newEdges.some( + (e) => e.source === fromNode.id && e.target === toNode.id, + ); + if (!exists) { + newEdges.push({ id: generateId(), source: fromNode.id, target: toNode.id }); + } + } else { + newEdges = newEdges.filter( + (e) => !(e.source === fromNode.id && e.target === toNode.id), + ); + } + } + + return { nodes, edges: newEdges }; +} From 20fe227f75a68920a9cecf87b69e36276069e887 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:16:22 +0000 Subject: [PATCH 04/25] feat: add conflict detection for selected AI suggestions --- src/lib/ai/conflict-detector.ts | 107 ++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 src/lib/ai/conflict-detector.ts diff --git a/src/lib/ai/conflict-detector.ts b/src/lib/ai/conflict-detector.ts new file mode 100644 index 0000000..ab2611f --- /dev/null +++ b/src/lib/ai/conflict-detector.ts @@ -0,0 +1,107 @@ +// src/lib/ai/conflict-detector.ts +import type { AiSuggestion } from "./types"; + +export interface ConflictPair { + a: string; + b: string; + reason: string; +} + +/** + * Detect conflicts between selected suggestions. + * Returns pairs of conflicting suggestion IDs with reasons. + */ +export function detectConflicts(suggestions: AiSuggestion[]): ConflictPair[] { + const conflicts: ConflictPair[] = []; + + for (let i = 0; i < suggestions.length; i++) { + for (let j = i + 1; j < suggestions.length; j++) { + const conflict = checkPairConflict(suggestions[i], suggestions[j]); + if (conflict) { + conflicts.push({ a: suggestions[i].id, b: suggestions[j].id, reason: conflict }); + } + } + } + + return conflicts; +} + +function checkPairConflict(a: AiSuggestion, b: AiSuggestion): string | null { + // Same-type: two modify_config on same component with overlapping keys + if (a.type === "modify_config" && b.type === "modify_config") { + if (a.componentKey === b.componentKey) { + const keysA = Object.keys(a.changes); + const keysB = Object.keys(b.changes); + const overlap = keysA.filter((k) => keysB.includes(k)); + if (overlap.length > 0) { + return `Both modify "${a.componentKey}" config keys: ${overlap.join(", ")}`; + } + } + } + + // Same-type: contradicting modify_connections + if (a.type === "modify_connections" && b.type === "modify_connections") { + for (const ea of a.edgeChanges) { + for (const eb of b.edgeChanges) { + if (ea.from === eb.from && ea.to === eb.to) { + if (ea.action !== eb.action) { + return `Contradicting edge changes: ${ea.from} to ${ea.to}`; + } + if (ea.action === "add" && eb.action === "add") { + return `Duplicate edge addition: ${ea.from} to ${ea.to}`; + } + } + } + } + } + + // Cross-type: remove_component vs anything referencing that component + if (a.type === "remove_component" || b.type === "remove_component") { + const remover = (a.type === "remove_component" ? a : b) as AiSuggestion & { type: "remove_component" }; + const other = a.type === "remove_component" ? b : a; + const removedKey = remover.componentKey; + const referencedKeys = getReferencedComponentKeys(other); + if (referencedKeys.has(removedKey)) { + return `"${removedKey}" is removed by one suggestion but referenced by another`; + } + } + + // Cross-type: add_component connectTo vs modify_connections removing same edge + if ( + (a.type === "add_component" && b.type === "modify_connections") || + (a.type === "modify_connections" && b.type === "add_component") + ) { + const adder = a.type === "add_component" ? a : b as AiSuggestion & { type: "add_component" }; + const modifier = a.type === "modify_connections" ? a : b as AiSuggestion & { type: "modify_connections" }; + for (const edge of modifier.edgeChanges) { + if (edge.action === "remove" && adder.connectTo.includes(edge.to)) { + return `add_component connects to "${edge.to}" but modify_connections removes an edge to it`; + } + } + } + + return null; +} + +function getReferencedComponentKeys(s: AiSuggestion): Set { + const keys = new Set(); + switch (s.type) { + case "modify_config": + keys.add(s.componentKey); + break; + case "add_component": + keys.add(s.insertAfter); + for (const k of s.connectTo) keys.add(k); + break; + case "remove_component": + keys.add(s.componentKey); + break; + case "modify_connections": + for (const e of s.edgeChanges) { + keys.add(e.from); + keys.add(e.to); + } + break; + } + return keys; +} From a6903c6928d46d3311a52ea9e3209703cc67b859 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:16:32 +0000 Subject: [PATCH 05/25] refactor: extend streamCompletion to accept conversation message history --- src/app/api/ai/pipeline/route.ts | 2 +- src/app/api/ai/vrl/route.ts | 2 +- src/server/services/ai.ts | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/app/api/ai/pipeline/route.ts b/src/app/api/ai/pipeline/route.ts index bd4ed1d..29d9386 100644 --- a/src/app/api/ai/pipeline/route.ts +++ b/src/app/api/ai/pipeline/route.ts @@ -83,7 +83,7 @@ export async function POST(request: Request) { await streamCompletion({ teamId: body.teamId, systemPrompt, - userPrompt: body.prompt, + messages: [{ role: "user", content: body.prompt }], onToken: (token) => { const data = JSON.stringify({ token }); controller.enqueue(encoder.encode(`data: ${data}\n\n`)); diff --git a/src/app/api/ai/vrl/route.ts b/src/app/api/ai/vrl/route.ts index cc8b348..cc1c452 100644 --- a/src/app/api/ai/vrl/route.ts +++ b/src/app/api/ai/vrl/route.ts @@ -76,7 +76,7 @@ export async function POST(request: Request) { await streamCompletion({ teamId: body.teamId, systemPrompt, - userPrompt: body.prompt, + messages: [{ role: "user", content: body.prompt }], onToken: (token) => { const data = JSON.stringify({ token }); controller.enqueue(encoder.encode(`data: ${data}\n\n`)); diff --git a/src/server/services/ai.ts b/src/server/services/ai.ts index b29aa18..0d81a16 100644 --- a/src/server/services/ai.ts +++ b/src/server/services/ai.ts @@ -53,7 +53,7 @@ function validateBaseUrl(baseUrl: string): void { interface StreamCompletionParams { teamId: string; systemPrompt: string; - userPrompt: string; + messages: Array<{ role: "user" | "assistant"; content: string }>; onToken: (token: string) => void; signal?: AbortSignal; } @@ -102,7 +102,7 @@ export async function getTeamAiConfig(teamId: string, { requireEnabled = true } export async function streamCompletion({ teamId, systemPrompt, - userPrompt, + messages, onToken, signal, }: StreamCompletionParams): Promise { @@ -126,8 +126,8 @@ export async function streamCompletion({ model: config.model, stream: true, messages: [ - { role: "system", content: systemPrompt }, - { role: "user", content: userPrompt }, + { role: "system" as const, content: systemPrompt }, + ...messages.map((m) => ({ role: m.role as "user" | "assistant", content: m.content })), ], }), signal, From 432394ac78f66b6de2e2ac745e32c29d931d459c Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:17:00 +0000 Subject: [PATCH 06/25] feat: add AI suggestion validation and outdated detection utilities --- src/lib/ai/suggestion-validator.ts | 101 +++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 src/lib/ai/suggestion-validator.ts diff --git a/src/lib/ai/suggestion-validator.ts b/src/lib/ai/suggestion-validator.ts new file mode 100644 index 0000000..2fc0597 --- /dev/null +++ b/src/lib/ai/suggestion-validator.ts @@ -0,0 +1,101 @@ +// src/lib/ai/suggestion-validator.ts +import type { Node } from "@xyflow/react"; +import type { AiSuggestion, AiReviewResponse, SuggestionStatus } from "./types"; + +/** + * Validate a parsed AI response. Returns the response if valid, null if not. + */ +export function parseAiReviewResponse(raw: string): AiReviewResponse | null { + try { + const parsed = JSON.parse(raw); + if ( + typeof parsed === "object" && + parsed !== null && + typeof parsed.summary === "string" && + Array.isArray(parsed.suggestions) + ) { + return parsed as AiReviewResponse; + } + return null; + } catch { + return null; + } +} + +/** + * Validate that suggestion references exist on the canvas. + */ +export function validateSuggestions( + suggestions: AiSuggestion[], + nodes: Node[], +): Map { + const componentKeys = new Set( + nodes.map((n) => (n.data as Record).componentKey as string), + ); + + const statuses = new Map(); + + for (const s of suggestions) { + const referencedKeys = getReferencedKeys(s); + const allValid = referencedKeys.every((k) => componentKeys.has(k)); + statuses.set(s.id, allValid ? "actionable" : "invalid"); + } + + return statuses; +} + +/** + * Determine which suggestions are outdated by comparing pipeline YAML snapshots. + */ +export function detectOutdatedSuggestions( + suggestions: AiSuggestion[], + snapshotYaml: string | null, + currentYaml: string, +): Set { + if (!snapshotYaml || snapshotYaml === currentYaml) { + return new Set(); + } + + const outdated = new Set(); + + for (const s of suggestions) { + const keys = getReferencedKeys(s); + for (const key of keys) { + const snapshotBlock = extractComponentBlock(snapshotYaml, key); + const currentBlock = extractComponentBlock(currentYaml, key); + if (snapshotBlock !== currentBlock) { + outdated.add(s.id); + break; + } + } + } + + return outdated; +} + +function getReferencedKeys(s: AiSuggestion): string[] { + switch (s.type) { + case "modify_config": + return [s.componentKey]; + case "add_component": + return [s.insertAfter, ...s.connectTo]; + case "remove_component": + return [s.componentKey]; + case "modify_connections": + return s.edgeChanges.flatMap((e) => [e.from, e.to]); + } +} + +function extractComponentBlock(yaml: string, componentKey: string): string | null { + const escaped = componentKey.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const regex = new RegExp(`^ ${escaped}:\\s*$`, "m"); + const match = regex.exec(yaml); + if (!match) return null; + + const start = match.index; + const rest = yaml.slice(start + match[0].length); + const nextKey = rest.search(/^\s{2}\S/m); + const end = nextKey === -1 ? yaml.length : start + match[0].length + nextKey; + + return yaml.slice(start, end).trim(); +} From c1299733adbd421dfcd785c80436c05745f228e7 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:17:22 +0000 Subject: [PATCH 07/25] feat: rewrite AI review prompt to return structured JSON suggestions --- src/lib/ai/prompts.ts | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/lib/ai/prompts.ts b/src/lib/ai/prompts.ts index bfe5cb9..6652414 100644 --- a/src/lib/ai/prompts.ts +++ b/src/lib/ai/prompts.ts @@ -61,10 +61,37 @@ export function buildPipelineSystemPrompt(context: { } else { parts.push( "You are a Vector pipeline configuration reviewer.", - "Analyze the provided Vector pipeline YAML and provide improvement suggestions.", - "Focus on: performance, correctness, best practices, and potential issues.", - "If the user asks for a revised config, output the complete corrected YAML with no markdown fencing.", - "Otherwise, provide suggestions as concise text.", + "Analyze the provided Vector pipeline YAML and return your response as a JSON object.", + "", + "Response format (return ONLY this JSON, no markdown fencing, no extra text):", + JSON.stringify({ + summary: "2-3 sentence analysis of the pipeline", + suggestions: [ + { + id: "s1", + type: "modify_config", + title: "Short title", + description: "Why this helps", + priority: "high|medium|low", + componentKey: "existing_component_key", + changes: { "config.field": "new_value" }, + }, + ], + }, null, 2), + "", + "Suggestion types:", + '- modify_config: { type: "modify_config", componentKey, changes: { field: value } }', + '- add_component: { type: "add_component", component: { key, componentType, kind: "source"|"transform"|"sink", config }, insertAfter: "existing_key", connectTo: ["downstream_key"] }', + '- remove_component: { type: "remove_component", componentKey, reconnect: true|false }', + '- modify_connections: { type: "modify_connections", edgeChanges: [{ action: "add"|"remove", from: "key", to: "key" }] }', + "", + "Rules:", + "- Each suggestion needs a unique id (s1, s2, s3...)", + "- componentKey values MUST match real keys from the provided YAML", + "- Focus on: performance, correctness, best practices, potential issues", + "- Prioritize: high = likely bug or major perf issue, medium = optimization, low = cleanup", + "- Return valid JSON only. No markdown, no code fences, no commentary outside the JSON.", + "- Even in follow-up messages, always return the full JSON object. Never mix prose with JSON.", ); } From 3bf2cf9541cb934d30848576f06cdc30c298f09a Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:18:34 +0000 Subject: [PATCH 08/25] feat: add tRPC AI router for conversation CRUD and suggestion tracking --- src/server/routers/ai.ts | 99 ++++++++++++++++++++++++++++++++++++++++ src/trpc/router.ts | 2 + 2 files changed, 101 insertions(+) create mode 100644 src/server/routers/ai.ts diff --git a/src/server/routers/ai.ts b/src/server/routers/ai.ts new file mode 100644 index 0000000..39ff7cb --- /dev/null +++ b/src/server/routers/ai.ts @@ -0,0 +1,99 @@ +import { z } from "zod"; +import { router, protectedProcedure, withTeamAccess } from "@/trpc/init"; +import { prisma } from "@/lib/prisma"; +import { writeAuditLog } from "@/server/services/audit"; + +export const aiRouter = router({ + getConversation: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + const conversation = await prisma.aiConversation.findFirst({ + where: { pipelineId: input.pipelineId }, + orderBy: { createdAt: "desc" }, + include: { + messages: { + orderBy: { createdAt: "asc" }, + include: { + createdBy: { select: { id: true, name: true, image: true } }, + }, + }, + }, + }); + return conversation; + }), + + startNewConversation: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("EDITOR")) + .mutation(async ({ input, ctx }) => { + const conversation = await prisma.aiConversation.create({ + data: { + pipelineId: input.pipelineId, + createdById: ctx.session.user.id, + }, + }); + return conversation; + }), + + markSuggestionsApplied: protectedProcedure + .input( + z.object({ + pipelineId: z.string(), + conversationId: z.string(), + messageId: z.string(), + suggestionIds: z.array(z.string()), + }), + ) + .use(withTeamAccess("EDITOR")) + .mutation(async ({ input, ctx }) => { + const message = await prisma.aiMessage.findUnique({ + where: { id: input.messageId }, + include: { + conversation: { select: { pipelineId: true } }, + }, + }); + + if (!message || message.conversationId !== input.conversationId) { + throw new Error("Message not found in conversation"); + } + + // Mark suggestions as applied in the JSON + const suggestions = (message.suggestions as Array>) ?? []; + const updatedSuggestions = suggestions.map((s) => + input.suggestionIds.includes(s.id as string) + ? { ...s, appliedAt: new Date().toISOString(), appliedById: ctx.session.user.id } + : s, + ); + + await prisma.aiMessage.update({ + where: { id: input.messageId }, + data: { suggestions: updatedSuggestions }, + }); + + // Audit log + const pipeline = await prisma.pipeline.findUnique({ + where: { id: message.conversation.pipelineId }, + select: { environmentId: true, environment: { select: { teamId: true } } }, + }); + + writeAuditLog({ + userId: ctx.session.user.id, + action: "pipeline.ai_suggestion_applied", + entityType: "Pipeline", + entityId: message.conversation.pipelineId, + metadata: { + conversationId: input.conversationId, + messageId: input.messageId, + suggestionIds: input.suggestionIds, + suggestionCount: input.suggestionIds.length, + }, + teamId: pipeline?.environment.teamId ?? null, + environmentId: pipeline?.environmentId ?? null, + userEmail: ctx.session.user.email ?? null, + userName: ctx.session.user.name ?? null, + }).catch(() => {}); + + return { applied: input.suggestionIds.length }; + }), +}); diff --git a/src/trpc/router.ts b/src/trpc/router.ts index dc0871a..3ff2027 100644 --- a/src/trpc/router.ts +++ b/src/trpc/router.ts @@ -20,6 +20,7 @@ import { alertRouter } from "@/server/routers/alert"; import { serviceAccountRouter } from "@/server/routers/service-account"; import { userPreferenceRouter } from "@/server/routers/user-preference"; import { sharedComponentRouter } from "@/server/routers/shared-component"; +import { aiRouter } from "@/server/routers/ai"; export const appRouter = router({ team: teamRouter, @@ -43,6 +44,7 @@ export const appRouter = router({ serviceAccount: serviceAccountRouter, userPreference: userPreferenceRouter, sharedComponent: sharedComponentRouter, + ai: aiRouter, }); export type AppRouter = typeof appRouter; From 561a4133dc3d1c4ad8bc877523431b129476c68f Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:18:41 +0000 Subject: [PATCH 09/25] feat: add batch applySuggestions action to flow store with single undo snapshot --- src/stores/flow-store.ts | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/stores/flow-store.ts b/src/stores/flow-store.ts index 5de01e7..562b1c0 100644 --- a/src/stores/flow-store.ts +++ b/src/stores/flow-store.ts @@ -1,6 +1,8 @@ import { create } from "zustand"; import { generateId } from "@/lib/utils"; import { generateComponentKey } from "@/lib/component-key"; +import { applySuggestion } from "@/lib/ai/suggestion-applier"; +import type { AiSuggestion } from "@/lib/ai/types"; import { type Node, type Edge, @@ -112,6 +114,9 @@ export interface FlowState { // Dirty tracking markClean: () => void; + // AI suggestions + applySuggestions: (suggestions: AiSuggestion[]) => { applied: number; errors: string[] }; + // Serialization loadGraph: (nodes: Node[], edges: Edge[], globalConfig?: Record | null, options?: { isSystem?: boolean }) => void; clearGraph: () => void; @@ -790,6 +795,39 @@ export const useFlowStore = create()((set, get) => ({ set({ isDirty: false, _savedSnapshot: snapshot } as Partial); }, + /* ---- AI suggestions ---- */ + + applySuggestions: (suggestions) => { + const errors: string[] = []; + let applied = 0; + + set((state) => { + // Single undo snapshot for the entire batch + const history = pushSnapshot(state); + let { nodes, edges } = state; + + for (const suggestion of suggestions) { + const result = applySuggestion(suggestion, nodes, edges); + if (result.error) { + errors.push(result.error); + } else { + nodes = result.nodes; + edges = result.edges; + applied++; + } + } + + return { + ...history, + nodes, + edges, + isDirty: applied > 0 ? true : state.isDirty, + }; + }); + + return { applied, errors }; + }, + /* ---- Serialization ---- */ loadGraph: (nodes, edges, globalConfig, options) => { From b06f91b198d8571334bc072e67301406f31b9c30 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:19:46 +0000 Subject: [PATCH 10/25] feat: add conversation persistence and message history to AI pipeline endpoint --- src/app/api/ai/pipeline/route.ts | 111 ++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/src/app/api/ai/pipeline/route.ts b/src/app/api/ai/pipeline/route.ts index 29d9386..9dc9247 100644 --- a/src/app/api/ai/pipeline/route.ts +++ b/src/app/api/ai/pipeline/route.ts @@ -4,6 +4,8 @@ import { auth } from "@/auth"; import { prisma } from "@/lib/prisma"; import { streamCompletion } from "@/server/services/ai"; import { buildPipelineSystemPrompt } from "@/lib/ai/prompts"; +import { writeAuditLog } from "@/server/services/audit"; +import type { AiReviewResponse } from "@/lib/ai/types"; export async function POST(request: Request) { const session = await auth(); @@ -20,6 +22,8 @@ export async function POST(request: Request) { mode: "generate" | "review"; currentYaml?: string; environmentName?: string; + pipelineId?: string; + conversationId?: string; }; try { @@ -67,6 +71,60 @@ export async function POST(request: Request) { }); } + // Validate pipelineId for review mode + if (body.mode === "review" && !body.pipelineId) { + return new Response(JSON.stringify({ error: "pipelineId is required for review mode" }), { + status: 400, + headers: { "Content-Type": "application/json" }, + }); + } + + // --- Conversation persistence (review mode only) --- + let conversationId = body.conversationId; + let priorMessages: Array<{ role: "user" | "assistant"; content: string }> = []; + + if (body.mode === "review" && body.pipelineId) { + if (!conversationId) { + const conversation = await prisma.aiConversation.create({ + data: { + pipelineId: body.pipelineId, + createdById: session.user.id, + }, + }); + conversationId = conversation.id; + } + + await prisma.aiMessage.create({ + data: { + conversationId, + role: "user", + content: body.prompt, + pipelineYaml: body.currentYaml ?? null, + createdById: session.user.id, + }, + }); + + // Get most recent 10 messages (desc) then reverse to chronological order + const history = await prisma.aiMessage.findMany({ + where: { conversationId }, + orderBy: { createdAt: "desc" }, + take: 10, + select: { role: true, content: true }, + }); + history.reverse(); + + // Exclude the message we just saved (last user msg) — it goes as the current prompt + priorMessages = history.slice(0, -1).map((m) => ({ + role: m.role as "user" | "assistant", + content: m.content, + })); + } + + const messages: Array<{ role: "user" | "assistant"; content: string }> = [ + ...priorMessages, + { role: "user", content: body.prompt }, + ]; + const mode = body.mode; const systemPrompt = buildPipelineSystemPrompt({ @@ -76,21 +134,72 @@ export async function POST(request: Request) { }); const encoder = new TextEncoder(); + let fullResponse = ""; const stream = new ReadableStream({ async start(controller) { try { + if (conversationId) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ conversationId })}\n\n`) + ); + } + await streamCompletion({ teamId: body.teamId, systemPrompt, - messages: [{ role: "user", content: body.prompt }], + messages, onToken: (token) => { + fullResponse += token; const data = JSON.stringify({ token }); controller.enqueue(encoder.encode(`data: ${data}\n\n`)); }, signal: request.signal, }); controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)); + + if (body.mode === "review" && conversationId) { + let parsedSuggestions = null; + try { + const parsed: AiReviewResponse = JSON.parse(fullResponse); + if (parsed.summary && Array.isArray(parsed.suggestions)) { + parsedSuggestions = parsed.suggestions; + } + } catch { + // Not valid JSON — store as raw text + } + + prisma.aiMessage.create({ + data: { + conversationId, + role: "assistant", + content: fullResponse, + suggestions: parsedSuggestions, + createdById: session.user.id, + }, + }).catch((err) => console.error("Failed to persist AI response:", err)); + + const pipelineForAudit = await prisma.pipeline.findUnique({ + where: { id: body.pipelineId! }, + select: { environmentId: true, environment: { select: { teamId: true } } }, + }); + + writeAuditLog({ + userId: session.user.id, + action: "pipeline.ai_review", + entityType: "Pipeline", + entityId: body.pipelineId!, + metadata: { + conversationId, + mode: body.mode, + suggestionCount: parsedSuggestions?.length ?? 0, + }, + teamId: pipelineForAudit?.environment.teamId ?? null, + environmentId: pipelineForAudit?.environmentId ?? null, + userEmail: session.user.email ?? null, + userName: session.user.name ?? null, + }).catch(() => {}); + } } catch (err) { const message = err instanceof Error ? err.message : "AI request failed"; controller.enqueue( From d343ed43cfedbe6d179e8eb051f7fdbbc8295a6a Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:19:53 +0000 Subject: [PATCH 11/25] feat: add AI suggestion card component with status states and conflict warnings --- src/components/flow/ai-suggestion-card.tsx | 150 +++++++++++++++++++++ src/components/ui/checkbox.tsx | 32 +++++ 2 files changed, 182 insertions(+) create mode 100644 src/components/flow/ai-suggestion-card.tsx create mode 100644 src/components/ui/checkbox.tsx diff --git a/src/components/flow/ai-suggestion-card.tsx b/src/components/flow/ai-suggestion-card.tsx new file mode 100644 index 0000000..10a9e00 --- /dev/null +++ b/src/components/flow/ai-suggestion-card.tsx @@ -0,0 +1,150 @@ +"use client"; + +import { Checkbox } from "@/components/ui/checkbox"; +import { Badge } from "@/components/ui/badge"; +import { cn } from "@/lib/utils"; +import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types"; +import { AlertTriangle } from "lucide-react"; + +interface AiSuggestionCardProps { + suggestion: AiSuggestion; + status: SuggestionStatus; + isSelected: boolean; + hasConflict: boolean; + conflictReason?: string; + onToggle: (id: string) => void; +} + +const TYPE_LABELS: Record = { + modify_config: "Config Change", + add_component: "Add Component", + remove_component: "Remove Component", + modify_connections: "Rewire", +}; + +const PRIORITY_COLORS: Record = { + high: "bg-red-500/15 text-red-700 dark:text-red-400", + medium: "bg-amber-500/15 text-amber-700 dark:text-amber-400", + low: "bg-green-500/15 text-green-700 dark:text-green-400", +}; + +const STATUS_BADGES: Partial> = { + applied: { label: "Applied", className: "bg-green-500/15 text-green-700 dark:text-green-400" }, + outdated: { label: "Outdated", className: "bg-amber-500/15 text-amber-700 dark:text-amber-400" }, + invalid: { label: "Invalid", className: "bg-red-500/15 text-red-700 dark:text-red-400" }, +}; + +export function AiSuggestionCard({ + suggestion, + status, + isSelected, + hasConflict, + conflictReason, + onToggle, +}: AiSuggestionCardProps) { + const isDisabled = status === "applied" || status === "invalid"; + const statusBadge = STATUS_BADGES[status]; + + return ( +
+
+ onToggle(suggestion.id)} + className="mt-0.5" + /> + +
+
+ + {suggestion.title} + + + {statusBadge && ( + + {statusBadge.label} + + )} + + + {suggestion.priority} + + + + {TYPE_LABELS[suggestion.type]} + +
+ +

+ {renderDescription(suggestion)} +

+ + {suggestion.type === "modify_config" && ( +
+ {Object.entries(suggestion.changes).map(([key, value]) => ( +
+ {key}:{" "} + {JSON.stringify(value)} +
+ ))} +
+ )} + + {hasConflict && conflictReason && ( +
+ + {conflictReason} +
+ )} +
+
+
+ ); +} + +function renderDescription(suggestion: AiSuggestion): React.ReactNode { + const desc = suggestion.description; + + // Highlight componentKey references in the description + const componentKeys: string[] = []; + if (suggestion.type === "modify_config" || suggestion.type === "remove_component") { + componentKeys.push(suggestion.componentKey); + } + if (suggestion.type === "add_component") { + componentKeys.push(suggestion.insertAfter, ...suggestion.connectTo); + } + if (suggestion.type === "modify_connections") { + for (const e of suggestion.edgeChanges) { + componentKeys.push(e.from, e.to); + } + } + + if (componentKeys.length === 0) return desc; + + const uniqueKeys = [...new Set(componentKeys)]; + const pattern = new RegExp(`(${uniqueKeys.map(k => k.replace(/[.*+?^${}()|[\]\\]/g, "\\$&")).join("|")})`, "g"); + const parts = desc.split(pattern); + + return parts.map((part, i) => + uniqueKeys.includes(part) ? ( + + {part} + + ) : ( + part + ), + ); +} diff --git a/src/components/ui/checkbox.tsx b/src/components/ui/checkbox.tsx new file mode 100644 index 0000000..f5a7e43 --- /dev/null +++ b/src/components/ui/checkbox.tsx @@ -0,0 +1,32 @@ +"use client" + +import * as React from "react" +import { CheckIcon } from "lucide-react" +import { Checkbox as CheckboxPrimitive } from "radix-ui" + +import { cn } from "@/lib/utils" + +function Checkbox({ + className, + ...props +}: React.ComponentProps) { + return ( + + + + + + ) +} + +export { Checkbox } From c00253b1921ac492722bdc03a9f044f9ac1a6613 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:22:26 +0000 Subject: [PATCH 12/25] feat: add useAiConversation hook for conversation management and streaming --- src/hooks/use-ai-conversation.ts | 221 +++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 src/hooks/use-ai-conversation.ts diff --git a/src/hooks/use-ai-conversation.ts b/src/hooks/use-ai-conversation.ts new file mode 100644 index 0000000..7a2b9ad --- /dev/null +++ b/src/hooks/use-ai-conversation.ts @@ -0,0 +1,221 @@ +"use client"; + +import { useState, useRef, useCallback } from "react"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { useTRPC } from "@/trpc/client"; +import { useTeamStore } from "@/stores/team-store"; +import type { AiSuggestion, AiReviewResponse } from "@/lib/ai/types"; +import { parseAiReviewResponse } from "@/lib/ai/suggestion-validator"; + +export interface ConversationMessage { + id: string; + role: "user" | "assistant"; + content: string; + suggestions?: AiSuggestion[]; + pipelineYaml?: string | null; + createdAt: string; + createdBy?: { id: string; name: string | null; image: string | null } | null; +} + +interface UseAiConversationOptions { + pipelineId: string; + currentYaml?: string; + environmentName?: string; +} + +export function useAiConversation({ + pipelineId, + currentYaml, + environmentName, +}: UseAiConversationOptions) { + const trpc = useTRPC(); + const queryClient = useQueryClient(); + const selectedTeamId = useTeamStore((s) => s.selectedTeamId); + + const [messages, setMessages] = useState([]); + const [conversationId, setConversationId] = useState(null); + const [isStreaming, setIsStreaming] = useState(false); + const [streamingContent, setStreamingContent] = useState(""); + const [error, setError] = useState(null); + const abortRef = useRef(null); + + // Load existing conversation + const conversationQuery = useQuery({ + ...trpc.ai.getConversation.queryOptions({ pipelineId }), + enabled: !!pipelineId, + }); + + // Sync loaded conversation into local state + const loadedConversation = conversationQuery.data; + if (loadedConversation && !conversationId && messages.length === 0 && !isStreaming) { + setConversationId(loadedConversation.id); + setMessages( + loadedConversation.messages.map((m) => ({ + id: m.id, + role: m.role as "user" | "assistant", + content: m.content, + suggestions: m.suggestions as unknown as AiSuggestion[] | undefined, + pipelineYaml: m.pipelineYaml, + createdAt: m.createdAt instanceof Date ? m.createdAt.toISOString() : String(m.createdAt), + createdBy: m.createdBy, + })), + ); + } + + const markAppliedMutation = useMutation( + trpc.ai.markSuggestionsApplied.mutationOptions({ + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) }); + }, + }), + ); + + const sendReview = useCallback( + async (prompt: string) => { + if (!prompt.trim() || !selectedTeamId || isStreaming) return; + + setIsStreaming(true); + setStreamingContent(""); + setError(null); + + // Add optimistic user message + const userMessage: ConversationMessage = { + id: `temp-user-${Date.now()}`, + role: "user", + content: prompt.trim(), + pipelineYaml: currentYaml ?? null, + createdAt: new Date().toISOString(), + }; + setMessages((prev) => [...prev, userMessage]); + + abortRef.current = new AbortController(); + let fullResponse = ""; + + try { + const response = await fetch("/api/ai/pipeline", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + teamId: selectedTeamId, + prompt: prompt.trim(), + mode: "review", + currentYaml, + environmentName, + pipelineId, + conversationId, + }), + signal: abortRef.current.signal, + }); + + if (!response.ok) { + const errData = await response.json().catch(() => ({ error: "Request failed" })); + throw new Error(errData.error || `HTTP ${response.status}`); + } + + const reader = response.body?.getReader(); + if (!reader) throw new Error("No response stream"); + + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || !trimmed.startsWith("data: ")) continue; + + try { + const data = JSON.parse(trimmed.slice(6)); + if (data.conversationId) { + setConversationId(data.conversationId); + continue; + } + if (data.done) break; + if (data.error) throw new Error(data.error); + if (data.token) { + fullResponse += data.token; + setStreamingContent(fullResponse); + } + } catch (parseErr) { + if (parseErr instanceof Error && parseErr.message !== "Unexpected end of JSON input") { + throw parseErr; + } + } + } + } + + // Parse the completed response + const parsed = parseAiReviewResponse(fullResponse); + + const assistantMessage: ConversationMessage = { + id: `temp-assistant-${Date.now()}`, + role: "assistant", + content: fullResponse, + suggestions: parsed?.suggestions, + pipelineYaml: currentYaml ?? null, + createdAt: new Date().toISOString(), + }; + setMessages((prev) => [...prev, assistantMessage]); + setStreamingContent(""); + + // Invalidate to sync with server-persisted messages + queryClient.invalidateQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) }); + } catch (err) { + if (err instanceof Error && err.name === "AbortError") return; + setError(err instanceof Error ? err.message : "AI request failed"); + } finally { + setIsStreaming(false); + abortRef.current = null; + } + }, + [selectedTeamId, isStreaming, currentYaml, environmentName, pipelineId, conversationId, queryClient, trpc], + ); + + const startNewConversation = useCallback(() => { + setMessages([]); + setConversationId(null); + setStreamingContent(""); + setError(null); + }, []); + + const markSuggestionsApplied = useCallback( + (messageId: string, suggestionIds: string[]) => { + if (!conversationId) return; + + // The server records which suggestions were applied in the JSON; + // we rely on the query invalidation in markAppliedMutation.onSuccess + // to refresh the conversation state. + + markAppliedMutation.mutate({ + pipelineId, + conversationId, + messageId, + suggestionIds, + }); + }, + [conversationId, pipelineId, markAppliedMutation], + ); + + const cancelStreaming = useCallback(() => { + abortRef.current?.abort(); + }, []); + + return { + messages, + conversationId, + isStreaming, + streamingContent, + error, + isLoading: conversationQuery.isLoading, + sendReview, + startNewConversation, + markSuggestionsApplied, + cancelStreaming, + }; +} From c3e9e1ac3ba0c8e2d71f7c08668674561f8e4751 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:22:32 +0000 Subject: [PATCH 13/25] feat: add AI message bubble with suggestion cards, selection, and batch apply --- src/components/flow/ai-message-bubble.tsx | 158 ++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 src/components/flow/ai-message-bubble.tsx diff --git a/src/components/flow/ai-message-bubble.tsx b/src/components/flow/ai-message-bubble.tsx new file mode 100644 index 0000000..d3b205c --- /dev/null +++ b/src/components/flow/ai-message-bubble.tsx @@ -0,0 +1,158 @@ +"use client"; + +import { useState, useMemo } from "react"; +import { Bot, User } from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { cn } from "@/lib/utils"; +import { AiSuggestionCard } from "./ai-suggestion-card"; +import { detectConflicts } from "@/lib/ai/conflict-detector"; +import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types"; +import type { ConversationMessage } from "@/hooks/use-ai-conversation"; + +interface AiMessageBubbleProps { + message: ConversationMessage; + suggestionStatuses: Map; + onApplySelected: (messageId: string, suggestions: AiSuggestion[]) => void; +} + +export function AiMessageBubble({ + message, + suggestionStatuses, + onApplySelected, +}: AiMessageBubbleProps) { + const [selectedIds, setSelectedIds] = useState>(new Set()); + + const suggestions = message.suggestions ?? []; + const hasSuggestions = message.role === "assistant" && suggestions.length > 0; + + // Parse summary from assistant JSON content + const summary = useMemo(() => { + if (message.role !== "assistant") return null; + if (!hasSuggestions) return null; + try { + const parsed = JSON.parse(message.content); + return parsed.summary as string | undefined; + } catch { + return null; + } + }, [message.content, message.role, hasSuggestions]); + + // Detect conflicts among selected suggestions + const conflicts = useMemo(() => { + const selected = suggestions.filter((s) => selectedIds.has(s.id)); + return detectConflicts(selected); + }, [suggestions, selectedIds]); + + const conflictMap = useMemo(() => { + const map = new Map(); + for (const c of conflicts) { + map.set(c.a, c.reason); + map.set(c.b, c.reason); + } + return map; + }, [conflicts]); + + const actionableSuggestions = suggestions.filter( + (s) => suggestionStatuses.get(s.id) === "actionable", + ); + + const selectedSuggestions = suggestions.filter((s) => selectedIds.has(s.id)); + + const handleToggle = (id: string) => { + setSelectedIds((prev) => { + const next = new Set(prev); + if (next.has(id)) { + next.delete(id); + } else { + next.add(id); + } + return next; + }); + }; + + const handleApplyAll = () => { + if (actionableSuggestions.length > 0) { + onApplySelected(message.id, actionableSuggestions); + } + }; + + const handleApplySelected = () => { + if (selectedSuggestions.length > 0) { + onApplySelected(message.id, selectedSuggestions); + } + }; + + if (message.role === "user") { + return ( +
+
+ +
+
+

{message.content}

+
+
+ ); + } + + // Assistant message + if (!hasSuggestions) { + // Fallback: render raw text content + return ( +
+
+ +
+
+
{message.content}
+
+
+ ); + } + + return ( +
+
+ +
+
+ {summary && ( +

{summary}

+ )} + +
+ {suggestions.map((s) => ( + + ))} +
+ + {actionableSuggestions.length > 0 && ( +
+ + +
+ )} +
+
+ ); +} From aef7de4f0635ad1457aa5c642ab35c6609eb80fd Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:24:19 +0000 Subject: [PATCH 14/25] feat: rewrite AI review dialog with conversation thread and actionable suggestion cards --- src/app/(dashboard)/pipelines/[id]/page.tsx | 1 + src/components/flow/ai-pipeline-dialog.tsx | 348 ++++++++++++++------ 2 files changed, 255 insertions(+), 94 deletions(-) diff --git a/src/app/(dashboard)/pipelines/[id]/page.tsx b/src/app/(dashboard)/pipelines/[id]/page.tsx index 1bebf06..fe2d3dd 100644 --- a/src/app/(dashboard)/pipelines/[id]/page.tsx +++ b/src/app/(dashboard)/pipelines/[id]/page.tsx @@ -540,6 +540,7 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) { )} diff --git a/src/components/flow/ai-pipeline-dialog.tsx b/src/components/flow/ai-pipeline-dialog.tsx index ad9a121..3172715 100644 --- a/src/components/flow/ai-pipeline-dialog.tsx +++ b/src/components/flow/ai-pipeline-dialog.tsx @@ -1,8 +1,8 @@ // src/components/flow/ai-pipeline-dialog.tsx "use client"; -import { useState, useRef, useCallback } from "react"; -import { Loader2, RotateCcw, Sparkles, AlertTriangle } from "lucide-react"; +import { useState, useRef, useCallback, useMemo, useEffect } from "react"; +import { Loader2, RotateCcw, Sparkles, AlertTriangle, MessageSquarePlus } from "lucide-react"; import { Button } from "@/components/ui/button"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; @@ -14,48 +14,112 @@ import { DialogTitle, } from "@/components/ui/dialog"; import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +import { ScrollArea } from "@/components/ui/scroll-area"; import { useTeamStore } from "@/stores/team-store"; import { useFlowStore } from "@/stores/flow-store"; import { generateVectorYaml, importVectorConfig } from "@/lib/config-generator"; import { toast } from "sonner"; +import { useAiConversation } from "@/hooks/use-ai-conversation"; +import { AiMessageBubble } from "./ai-message-bubble"; +import { validateSuggestions } from "@/lib/ai/suggestion-validator"; +import { detectOutdatedSuggestions } from "@/lib/ai/suggestion-validator"; +import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types"; interface AiPipelineDialogProps { open: boolean; onOpenChange: (open: boolean) => void; + pipelineId: string; environmentName?: string; } export function AiPipelineDialog({ open, onOpenChange, + pipelineId, environmentName, }: AiPipelineDialogProps) { - const [prompt, setPrompt] = useState(""); - const [result, setResult] = useState(""); - const [isStreaming, setIsStreaming] = useState(false); - const [error, setError] = useState(null); const [mode, setMode] = useState<"generate" | "review">("generate"); - const abortRef = useRef(null); + + // --- Generate tab state (unchanged from original) --- + const [genPrompt, setGenPrompt] = useState(""); + const [genResult, setGenResult] = useState(""); + const [genIsStreaming, setGenIsStreaming] = useState(false); + const [genError, setGenError] = useState(null); + const genAbortRef = useRef(null); const selectedTeamId = useTeamStore((s) => s.selectedTeamId); const nodes = useFlowStore((s) => s.nodes); const edges = useFlowStore((s) => s.edges); const globalConfig = useFlowStore((s) => s.globalConfig); const loadGraph = useFlowStore((s) => s.loadGraph); + const applySuggestions = useFlowStore((s) => s.applySuggestions); const currentYaml = nodes.length > 0 ? generateVectorYaml(nodes, edges, globalConfig) : undefined; - const handleSubmit = useCallback( + // --- Review tab state --- + const [reviewPrompt, setReviewPrompt] = useState(""); + const conversation = useAiConversation({ + pipelineId, + currentYaml, + environmentName, + }); + const messagesEndRef = useRef(null); + + // Auto-scroll to bottom when messages change + useEffect(() => { + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }, [conversation.messages, conversation.streamingContent]); + + // Compute suggestion statuses across all messages + const suggestionStatuses = useMemo(() => { + const statuses = new Map(); + + for (const msg of conversation.messages) { + if (msg.role !== "assistant" || !msg.suggestions) continue; + + // Validate references against current canvas + const validation = validateSuggestions(msg.suggestions, nodes); + for (const [id, status] of validation) { + statuses.set(id, status); + } + + // Check for outdated suggestions + const outdated = detectOutdatedSuggestions( + msg.suggestions, + msg.pipelineYaml ?? null, + currentYaml ?? "", + ); + for (const id of outdated) { + if (statuses.get(id) === "actionable") { + statuses.set(id, "outdated"); + } + } + + // Check for already-applied suggestions (from server data) + for (const s of msg.suggestions) { + const raw = s as unknown as Record; + if (raw.appliedAt) { + statuses.set(s.id, "applied"); + } + } + } + + return statuses; + }, [conversation.messages, nodes, currentYaml]); + + // --- Generate tab handlers (identical to original) --- + + const handleGenSubmit = useCallback( async (e?: React.FormEvent) => { e?.preventDefault(); - if (!prompt.trim() || !selectedTeamId || isStreaming) return; + if (!genPrompt.trim() || !selectedTeamId || genIsStreaming) return; - setIsStreaming(true); - setResult(""); - setError(null); + setGenIsStreaming(true); + setGenResult(""); + setGenError(null); - abortRef.current = new AbortController(); + genAbortRef.current = new AbortController(); try { const response = await fetch("/api/ai/pipeline", { @@ -63,12 +127,12 @@ export function AiPipelineDialog({ headers: { "Content-Type": "application/json" }, body: JSON.stringify({ teamId: selectedTeamId, - prompt: prompt.trim(), - mode, - currentYaml: mode === "review" ? currentYaml : undefined, + prompt: genPrompt.trim(), + mode: "generate", + currentYaml: undefined, environmentName, }), - signal: abortRef.current.signal, + signal: genAbortRef.current.signal, }); if (!response.ok) { @@ -99,7 +163,7 @@ export function AiPipelineDialog({ if (data.done) break; if (data.error) throw new Error(data.error); if (data.token) { - setResult((prev) => prev + data.token); + setGenResult((prev) => prev + data.token); } } catch (parseErr) { if (parseErr instanceof Error && parseErr.message !== "Unexpected end of JSON input") { @@ -110,19 +174,18 @@ export function AiPipelineDialog({ } } catch (err) { if (err instanceof Error && err.name === "AbortError") return; - setError(err instanceof Error ? err.message : "AI request failed"); + setGenError(err instanceof Error ? err.message : "AI request failed"); } finally { - setIsStreaming(false); - abortRef.current = null; + setGenIsStreaming(false); + genAbortRef.current = null; } }, - [prompt, selectedTeamId, currentYaml, mode, environmentName, isStreaming], + [genPrompt, selectedTeamId, environmentName, genIsStreaming], ); const handleApplyToCanvas = () => { try { - // Strip any markdown fencing the LLM might have added - let yaml = result.trim(); + let yaml = genResult.trim(); if (yaml.startsWith("```yaml")) yaml = yaml.slice(7); if (yaml.startsWith("```")) yaml = yaml.slice(3); if (yaml.endsWith("```")) yaml = yaml.slice(0, -3); @@ -132,10 +195,8 @@ export function AiPipelineDialog({ importVectorConfig(yaml); if (nodes.length === 0) { - // Empty pipeline: replace loadGraph(newNodes, newEdges, importedGlobalConfig); } else { - // Existing pipeline: add alongside (offset positions to avoid overlap) const maxY = Math.max(...nodes.map((n) => n.position.y), 0); const offsetNodes = newNodes.map((n) => ({ ...n, @@ -149,8 +210,8 @@ export function AiPipelineDialog({ toast.success(`Applied ${newNodes.length} components to canvas`); onOpenChange(false); - setResult(""); - setPrompt(""); + setGenResult(""); + setGenPrompt(""); } catch (err) { toast.error("Failed to parse YAML", { description: err instanceof Error ? err.message : "Invalid YAML output", @@ -158,13 +219,46 @@ export function AiPipelineDialog({ } }; - const handleCancel = () => { - abortRef.current?.abort(); + const handleGenCancel = () => { + genAbortRef.current?.abort(); }; + // --- Review tab handlers --- + + const handleReviewSubmit = useCallback( + (e?: React.FormEvent) => { + e?.preventDefault(); + if (!reviewPrompt.trim()) return; + const prompt = reviewPrompt; + setReviewPrompt(""); + conversation.sendReview(prompt); + }, + [reviewPrompt, conversation], + ); + + const handleApplySelected = useCallback( + (messageId: string, suggestions: AiSuggestion[]) => { + const { applied, errors } = applySuggestions(suggestions); + + if (applied > 0) { + toast.success(`Applied ${applied} suggestion${applied > 1 ? "s" : ""} to canvas`); + conversation.markSuggestionsApplied( + messageId, + suggestions.map((s) => s.id), + ); + } + if (errors.length > 0) { + toast.error(`${errors.length} suggestion${errors.length > 1 ? "s" : ""} failed`, { + description: errors[0], + }); + } + }, + [applySuggestions, conversation], + ); + return ( - + @@ -175,7 +269,7 @@ export function AiPipelineDialog({ - setMode(v as "generate" | "review")}> + setMode(v as "generate" | "review")} className="flex flex-col flex-1 overflow-hidden"> Generate @@ -183,88 +277,154 @@ export function AiPipelineDialog({ - + {/* ---- Generate tab (unchanged) ---- */} +
-
+ setPrompt(e.target.value)} + value={genPrompt} + onChange={(e) => setGenPrompt(e.target.value)} placeholder="Collect K8s logs, drop debug, send to Datadog and S3" - disabled={isStreaming} + disabled={genIsStreaming} /> - {isStreaming ? ( - ) : ( - )}
-
- -
- -
- setPrompt(e.target.value)} - placeholder="Is my pipeline config optimal? Any issues?" - disabled={isStreaming} - /> - {isStreaming ? ( - - ) : ( - + {genError && ( +
+ + {genError} +
+ )} + + {(genResult || genIsStreaming) && ( +
+ +
+ {genResult || ( + + + Generating pipeline... + + )} +
+ {!genIsStreaming && genResult && ( +
+ + +
)} - -
+
+ )}
-
- {error && ( -
- - {error} -
- )} - - {(result || isStreaming) && ( -
- -
- {result || ( - - - {mode === "generate" ? "Generating pipeline..." : "Reviewing pipeline..."} - - )} -
- {!isStreaming && result && ( -
- {mode === "generate" && ( - - )} - + {/* ---- Review tab (conversation thread) ---- */} + + {conversation.isLoading ? ( +
+
+ ) : ( + <> + {/* Message thread */} + +
+ {conversation.messages.length === 0 && !conversation.isStreaming && ( +

+ Ask the AI to review your pipeline configuration. +

+ )} + + {conversation.messages.map((msg) => ( + + ))} + + {conversation.isStreaming && conversation.streamingContent && ( +
+
+ +
+
+
+ {conversation.streamingContent} +
+
+
+ )} + + {conversation.isStreaming && !conversation.streamingContent && ( +
+ + Analyzing pipeline... +
+ )} + +
+
+ + + {conversation.error && ( +
+ + {conversation.error} +
+ )} + + {/* Input pinned at bottom */} +
+
+ setReviewPrompt(e.target.value)} + placeholder="Ask about your pipeline..." + disabled={conversation.isStreaming} + /> + {conversation.isStreaming ? ( + + ) : ( + + )} +
+ {conversation.messages.length > 0 && ( + + )} +
+ )} -
- )} +
+
); From 6768de4ac0e32078251b8b7d8328154404428b0c Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:25:03 +0000 Subject: [PATCH 15/25] docs: update AI suggestions guide with actionable review cards --- docs/public/user-guide/ai-suggestions.md | 39 +++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/docs/public/user-guide/ai-suggestions.md b/docs/public/user-guide/ai-suggestions.md index e04241c..5c95ac2 100644 --- a/docs/public/user-guide/ai-suggestions.md +++ b/docs/public/user-guide/ai-suggestions.md @@ -50,7 +50,44 @@ Ask the AI to analyze your current pipeline configuration: > "Are there any performance issues with my pipeline?" -The AI reviews the generated YAML and provides suggestions for improvements, best practices, and potential issues. +The AI returns structured, actionable suggestion cards that you can selectively apply to your canvas. + +#### Suggestion cards + +Each suggestion appears as an interactive card showing: + +- **Title** and **description** explaining why the change helps +- **Priority badge** (High, Medium, Low) +- **Type badge** — Config Change, Add Component, Remove Component, or Rewire +- **Checkbox** for batch selection +- **Config preview** for configuration changes showing the exact fields that will be modified + +#### Applying suggestions + +- **Apply All** — applies every actionable suggestion from that AI response +- **Apply Selected** — applies only the suggestions you have checked + +Applied suggestions are marked with a green "Applied" badge and cannot be re-applied. The entire batch is a single undo operation — press **Ctrl+Z** (or **Cmd+Z**) to revert all changes at once. + +#### Conflict detection + +When you select multiple suggestions that conflict (e.g., two suggestions modifying the same config field, or one removing a component that another references), an amber warning appears on the affected cards explaining the conflict. You can still apply conflicting suggestions, but review the warnings first. + +#### Suggestion statuses + +| Status | Meaning | +|--------|---------| +| **Actionable** | Ready to apply | +| **Applied** | Already applied to the canvas | +| **Outdated** | The pipeline changed since this suggestion was made | +| **Invalid** | References a component that no longer exists on the canvas | + +#### Conversations + +Review conversations are persistent — they are saved per pipeline and visible to all team members with access. You can: + +- **Ask follow-up questions** using the input at the bottom of the dialog +- **Start a new conversation** by clicking "New Conversation" below the input ## Rate Limits From a151e7a7018a3641b91da779a04db27abfef0b52 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:29:06 +0000 Subject: [PATCH 16/25] fix: resolve Prisma Json type errors and lint issues in AI suggestion backend --- src/app/api/ai/pipeline/route.ts | 3 ++- src/lib/ai/suggestion-applier.ts | 2 +- src/server/routers/ai.ts | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/app/api/ai/pipeline/route.ts b/src/app/api/ai/pipeline/route.ts index 9dc9247..a7e6eac 100644 --- a/src/app/api/ai/pipeline/route.ts +++ b/src/app/api/ai/pipeline/route.ts @@ -6,6 +6,7 @@ import { streamCompletion } from "@/server/services/ai"; import { buildPipelineSystemPrompt } from "@/lib/ai/prompts"; import { writeAuditLog } from "@/server/services/audit"; import type { AiReviewResponse } from "@/lib/ai/types"; +import { Prisma } from "@/generated/prisma"; export async function POST(request: Request) { const session = await auth(); @@ -174,7 +175,7 @@ export async function POST(request: Request) { conversationId, role: "assistant", content: fullResponse, - suggestions: parsedSuggestions, + suggestions: (parsedSuggestions as unknown as Prisma.InputJsonValue) ?? undefined, createdById: session.user.id, }, }).catch((err) => console.error("Failed to persist AI response:", err)); diff --git a/src/lib/ai/suggestion-applier.ts b/src/lib/ai/suggestion-applier.ts index 257bf79..7e80e72 100644 --- a/src/lib/ai/suggestion-applier.ts +++ b/src/lib/ai/suggestion-applier.ts @@ -135,7 +135,7 @@ function applyRemoveComponent( const incomingEdges = edges.filter((e) => e.target === target.id); const outgoingEdges = edges.filter((e) => e.source === target.id); - let newEdges = edges.filter((e) => e.source !== target.id && e.target !== target.id); + const newEdges = edges.filter((e) => e.source !== target.id && e.target !== target.id); const newNodes = nodes.filter((n) => n.id !== target.id); if (suggestion.reconnect) { diff --git a/src/server/routers/ai.ts b/src/server/routers/ai.ts index 39ff7cb..fe94a79 100644 --- a/src/server/routers/ai.ts +++ b/src/server/routers/ai.ts @@ -2,6 +2,7 @@ import { z } from "zod"; import { router, protectedProcedure, withTeamAccess } from "@/trpc/init"; import { prisma } from "@/lib/prisma"; import { writeAuditLog } from "@/server/services/audit"; +import { Prisma } from "@/generated/prisma"; export const aiRouter = router({ getConversation: protectedProcedure @@ -68,7 +69,7 @@ export const aiRouter = router({ await prisma.aiMessage.update({ where: { id: input.messageId }, - data: { suggestions: updatedSuggestions }, + data: { suggestions: updatedSuggestions as unknown as Prisma.InputJsonValue }, }); // Audit log From c83d37c6dca1b0d50ce26433714d5afda109278e Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Tue, 10 Mar 2026 21:30:31 +0000 Subject: [PATCH 17/25] fix: resolve lint warnings in AI conversation components --- src/components/flow/ai-message-bubble.tsx | 3 +-- src/hooks/use-ai-conversation.ts | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/components/flow/ai-message-bubble.tsx b/src/components/flow/ai-message-bubble.tsx index d3b205c..d4113f3 100644 --- a/src/components/flow/ai-message-bubble.tsx +++ b/src/components/flow/ai-message-bubble.tsx @@ -3,7 +3,6 @@ import { useState, useMemo } from "react"; import { Bot, User } from "lucide-react"; import { Button } from "@/components/ui/button"; -import { cn } from "@/lib/utils"; import { AiSuggestionCard } from "./ai-suggestion-card"; import { detectConflicts } from "@/lib/ai/conflict-detector"; import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types"; @@ -22,7 +21,7 @@ export function AiMessageBubble({ }: AiMessageBubbleProps) { const [selectedIds, setSelectedIds] = useState>(new Set()); - const suggestions = message.suggestions ?? []; + const suggestions = useMemo(() => message.suggestions ?? [], [message.suggestions]); const hasSuggestions = message.role === "assistant" && suggestions.length > 0; // Parse summary from assistant JSON content diff --git a/src/hooks/use-ai-conversation.ts b/src/hooks/use-ai-conversation.ts index 7a2b9ad..f3bfc37 100644 --- a/src/hooks/use-ai-conversation.ts +++ b/src/hooks/use-ai-conversation.ts @@ -4,7 +4,7 @@ import { useState, useRef, useCallback } from "react"; import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; import { useTRPC } from "@/trpc/client"; import { useTeamStore } from "@/stores/team-store"; -import type { AiSuggestion, AiReviewResponse } from "@/lib/ai/types"; +import type { AiSuggestion } from "@/lib/ai/types"; import { parseAiReviewResponse } from "@/lib/ai/suggestion-validator"; export interface ConversationMessage { From 227d529cbed3786dd5a9469dd99225d9c505f954 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:10:50 +0000 Subject: [PATCH 18/25] fix: hash bearer tokens in WS auth cache to prevent credential exposure --- src/server/services/ws-auth.ts | 67 ++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 src/server/services/ws-auth.ts diff --git a/src/server/services/ws-auth.ts b/src/server/services/ws-auth.ts new file mode 100644 index 0000000..a6b7571 --- /dev/null +++ b/src/server/services/ws-auth.ts @@ -0,0 +1,67 @@ +import { createHash } from "crypto"; +import type { IncomingMessage } from "http"; +import { prisma } from "@/lib/prisma"; +import { extractBearerToken, verifyNodeToken } from "./agent-token"; + +/** Derive a fast, non-reversible cache key from a plaintext token. */ +function tokenCacheKey(token: string): string { + return createHash("sha256").update(token).digest("hex"); +} + +/** Cache verified tokens to avoid O(n) bcrypt scan on every WS upgrade. + * Key: SHA-256 hash of token, Value: { nodeId, environmentId }. + * Entries are evicted when the token fails verification (node re-enrolled). */ +const tokenCache = new Map(); + +/** + * Authenticate a WebSocket upgrade request by verifying its Bearer token. + * + * Uses an in-memory cache so reconnects (same token) are O(1) instead of + * scanning all node hashes with bcrypt. + */ +export async function authenticateWsUpgrade( + req: IncomingMessage, +): Promise<{ nodeId: string; environmentId: string } | null> { + const authHeader = req.headers["authorization"]; + const token = extractBearerToken( + Array.isArray(authHeader) ? authHeader[0] : authHeader ?? null, + ); + if (!token) { + return null; + } + + const cacheKey = tokenCacheKey(token); + + // Fast path: check cache first (O(1) string lookup) + const cached = tokenCache.get(cacheKey); + if (cached) { + // Verify the node still exists and the hash still matches (re-enrollment invalidates) + const node = await prisma.vectorNode.findUnique({ + where: { id: cached.nodeId }, + select: { nodeTokenHash: true }, + }); + if (node?.nodeTokenHash && await verifyNodeToken(token, node.nodeTokenHash)) { + return cached; + } + // Cache stale — node deleted or re-enrolled + tokenCache.delete(cacheKey); + } + + // Slow path: scan all nodes with bcrypt + const nodes = await prisma.vectorNode.findMany({ + where: { nodeTokenHash: { not: null } }, + select: { id: true, environmentId: true, nodeTokenHash: true }, + }); + + for (const node of nodes) { + if (!node.nodeTokenHash) continue; + const valid = await verifyNodeToken(token, node.nodeTokenHash); + if (valid) { + const result = { nodeId: node.id, environmentId: node.environmentId }; + tokenCache.set(cacheKey, result); + return result; + } + } + + return null; +} From d4e75ae892d661442005132123e01a752613bb6c Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:11:15 +0000 Subject: [PATCH 19/25] fix: reset poll ticker when receiving poll_interval from server --- agent/internal/agent/agent.go | 21 ++++++++++++++++++++- agent/internal/agent/poller.go | 9 +++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index ac98d72..91f839a 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -78,12 +78,14 @@ func (a *Agent) Run() error { }() // Main loop: poll + heartbeat - ticker := time.NewTicker(a.cfg.PollInterval) + currentInterval := a.cfg.PollInterval + ticker := time.NewTicker(currentInterval) defer ticker.Stop() // Do first poll immediately a.pollAndApply() a.sendHeartbeat() + currentInterval = a.maybeResetTicker(ticker, currentInterval) for { select { @@ -95,10 +97,27 @@ func (a *Agent) Run() error { case <-ticker.C: a.pollAndApply() a.sendHeartbeat() + currentInterval = a.maybeResetTicker(ticker, currentInterval) } } } +// maybeResetTicker checks if the server provided a new poll interval and resets +// the ticker if it changed. Returns the (possibly updated) current interval. +func (a *Agent) maybeResetTicker(ticker *time.Ticker, current time.Duration) time.Duration { + serverMs := a.poller.PollIntervalMs() + if serverMs <= 0 { + return current + } + serverInterval := time.Duration(serverMs) * time.Millisecond + if serverInterval != current { + slog.Info("poll interval updated by server", "old", current, "new", serverInterval) + ticker.Reset(serverInterval) + return serverInterval + } + return current +} + func (a *Agent) pollAndApply() { actions, err := a.poller.Poll() if err != nil { diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index c4d5410..3235d2b 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -27,6 +27,7 @@ type poller struct { known map[string]pipelineState // pipelineId -> last known state sampleRequests []client.SampleRequestMsg pendingAction *client.PendingAction + pollIntervalMs int // server-provided poll interval from last response } func newPoller(cfg *config.Config, c configFetcher) *poller { @@ -184,6 +185,9 @@ func (p *poller) Poll() ([]PipelineAction, error) { // Store pending action (e.g. self-update) for the agent to handle p.pendingAction = resp.PendingAction + // Store server-provided poll interval + p.pollIntervalMs = resp.PollIntervalMs + return actions, nil } @@ -196,3 +200,8 @@ func (p *poller) SampleRequests() []client.SampleRequestMsg { func (p *poller) PendingAction() *client.PendingAction { return p.pendingAction } + +// PollIntervalMs returns the server-provided poll interval from the last response. +func (p *poller) PollIntervalMs() int { + return p.pollIntervalMs +} From 2631f82f011bcbd72e98f38935b5403144f79e21 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:11:34 +0000 Subject: [PATCH 20/25] fix: resolve race condition in agent samples error path --- src/app/api/agent/heartbeat/route.ts | 57 +++++++++++++++++++--------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 6c9f8c9..88338c6 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -362,25 +362,46 @@ export async function POST(request: Request) { }); if (!request || request.status !== "PENDING") continue; - await prisma.eventSample.create({ - data: { - requestId: result.requestId, - pipelineId: request.pipelineId, - componentKey: result.componentKey ?? "", - events: (result.events ?? []) as Prisma.InputJsonValue, - schema: (result.schema ?? []) as Prisma.InputJsonValue, - error: result.error ?? null, - }, - }); + try { + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: request.pipelineId, + componentKey: result.componentKey ?? "", + events: (result.events ?? []) as Prisma.InputJsonValue, + schema: (result.schema ?? []) as Prisma.InputJsonValue, + error: result.error ?? null, + }, + }); - await prisma.eventSampleRequest.update({ - where: { id: result.requestId }, - data: { - status: result.error ? "ERROR" : "COMPLETED", - completedAt: new Date(), - nodeId: agent.nodeId, - }, - }); + await prisma.eventSampleRequest.update({ + where: { id: result.requestId }, + data: { + status: result.error ? "ERROR" : "COMPLETED", + completedAt: new Date(), + nodeId: agent.nodeId, + }, + }); + } catch (err) { + // Only mark as ERROR if the EventSample write itself failed. + // If another agent already submitted a successful result, the + // request may already be COMPLETED — avoid overwriting that. + const current = await prisma.eventSampleRequest.findUnique({ + where: { id: result.requestId }, + select: { status: true }, + }); + if (current && current.status === "PENDING") { + await prisma.eventSampleRequest.update({ + where: { id: result.requestId }, + data: { + status: "ERROR", + completedAt: new Date(), + nodeId: agent.nodeId, + }, + }); + } + console.error("EventSample write error:", err); + } } } From 3ba3b10d691bc925f28f3f94c8c752588f2c705b Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:28:45 +0000 Subject: [PATCH 21/25] fix: close authorization gaps in AI conversation endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Verify pipelineId belongs to teamId in streaming route - Verify conversationId belongs to pipelineId when reusing conversations - Add pipeline ownership check in markSuggestionsApplied (conversationId↔pipelineId) - Use TRPCError instead of raw Error for consistent error handling - Add withAudit middleware to startNewConversation mutation --- src/app/api/ai/pipeline/route.ts | 24 ++++++++++++++++++++++++ src/server/routers/ai.ts | 11 +++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/app/api/ai/pipeline/route.ts b/src/app/api/ai/pipeline/route.ts index a7e6eac..9441994 100644 --- a/src/app/api/ai/pipeline/route.ts +++ b/src/app/api/ai/pipeline/route.ts @@ -85,6 +85,18 @@ export async function POST(request: Request) { let priorMessages: Array<{ role: "user" | "assistant"; content: string }> = []; if (body.mode === "review" && body.pipelineId) { + // Verify pipelineId belongs to the team + const pipeline = await prisma.pipeline.findUnique({ + where: { id: body.pipelineId }, + select: { environment: { select: { teamId: true } } }, + }); + if (!pipeline || pipeline.environment.teamId !== body.teamId) { + return new Response(JSON.stringify({ error: "Pipeline not found" }), { + status: 404, + headers: { "Content-Type": "application/json" }, + }); + } + if (!conversationId) { const conversation = await prisma.aiConversation.create({ data: { @@ -93,6 +105,18 @@ export async function POST(request: Request) { }, }); conversationId = conversation.id; + } else { + // Verify conversationId belongs to this pipeline + const existing = await prisma.aiConversation.findUnique({ + where: { id: conversationId }, + select: { pipelineId: true }, + }); + if (!existing || existing.pipelineId !== body.pipelineId) { + return new Response(JSON.stringify({ error: "Conversation not found" }), { + status: 404, + headers: { "Content-Type": "application/json" }, + }); + } } await prisma.aiMessage.create({ diff --git a/src/server/routers/ai.ts b/src/server/routers/ai.ts index fe94a79..ee6f2a5 100644 --- a/src/server/routers/ai.ts +++ b/src/server/routers/ai.ts @@ -1,7 +1,9 @@ import { z } from "zod"; +import { TRPCError } from "@trpc/server"; import { router, protectedProcedure, withTeamAccess } from "@/trpc/init"; import { prisma } from "@/lib/prisma"; import { writeAuditLog } from "@/server/services/audit"; +import { withAudit } from "@/server/middleware/audit"; import { Prisma } from "@/generated/prisma"; export const aiRouter = router({ @@ -27,6 +29,7 @@ export const aiRouter = router({ startNewConversation: protectedProcedure .input(z.object({ pipelineId: z.string() })) .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.ai_conversation_started", "Pipeline")) .mutation(async ({ input, ctx }) => { const conversation = await prisma.aiConversation.create({ data: { @@ -55,8 +58,12 @@ export const aiRouter = router({ }, }); - if (!message || message.conversationId !== input.conversationId) { - throw new Error("Message not found in conversation"); + if ( + !message || + message.conversationId !== input.conversationId || + message.conversation.pipelineId !== input.pipelineId + ) { + throw new TRPCError({ code: "NOT_FOUND", message: "Message not found in conversation" }); } // Mark suggestions as applied in the JSON From bd86fb418792bba7c4630bbcf453f021f2a9e679 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:37:47 +0000 Subject: [PATCH 22/25] fix: prevent ghost undo entry and use withAudit middleware for suggestion tracking - Only push undo snapshot when at least one suggestion is applied (prevents ghost Ctrl+Z entries when all suggestions fail) - Replace manual writeAuditLog with withAudit middleware in markSuggestionsApplied to match project conventions --- src/server/routers/ai.ts | 25 +------------------------ src/stores/flow-store.ts | 8 +++++--- 2 files changed, 6 insertions(+), 27 deletions(-) diff --git a/src/server/routers/ai.ts b/src/server/routers/ai.ts index ee6f2a5..3b3fd6c 100644 --- a/src/server/routers/ai.ts +++ b/src/server/routers/ai.ts @@ -2,7 +2,6 @@ import { z } from "zod"; import { TRPCError } from "@trpc/server"; import { router, protectedProcedure, withTeamAccess } from "@/trpc/init"; import { prisma } from "@/lib/prisma"; -import { writeAuditLog } from "@/server/services/audit"; import { withAudit } from "@/server/middleware/audit"; import { Prisma } from "@/generated/prisma"; @@ -50,6 +49,7 @@ export const aiRouter = router({ }), ) .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.ai_suggestion_applied", "Pipeline")) .mutation(async ({ input, ctx }) => { const message = await prisma.aiMessage.findUnique({ where: { id: input.messageId }, @@ -79,29 +79,6 @@ export const aiRouter = router({ data: { suggestions: updatedSuggestions as unknown as Prisma.InputJsonValue }, }); - // Audit log - const pipeline = await prisma.pipeline.findUnique({ - where: { id: message.conversation.pipelineId }, - select: { environmentId: true, environment: { select: { teamId: true } } }, - }); - - writeAuditLog({ - userId: ctx.session.user.id, - action: "pipeline.ai_suggestion_applied", - entityType: "Pipeline", - entityId: message.conversation.pipelineId, - metadata: { - conversationId: input.conversationId, - messageId: input.messageId, - suggestionIds: input.suggestionIds, - suggestionCount: input.suggestionIds.length, - }, - teamId: pipeline?.environment.teamId ?? null, - environmentId: pipeline?.environmentId ?? null, - userEmail: ctx.session.user.email ?? null, - userName: ctx.session.user.name ?? null, - }).catch(() => {}); - return { applied: input.suggestionIds.length }; }), }); diff --git a/src/stores/flow-store.ts b/src/stores/flow-store.ts index 562b1c0..1386a35 100644 --- a/src/stores/flow-store.ts +++ b/src/stores/flow-store.ts @@ -802,8 +802,6 @@ export const useFlowStore = create()((set, get) => ({ let applied = 0; set((state) => { - // Single undo snapshot for the entire batch - const history = pushSnapshot(state); let { nodes, edges } = state; for (const suggestion of suggestions) { @@ -817,11 +815,15 @@ export const useFlowStore = create()((set, get) => ({ } } + // Only push an undo snapshot when something actually changed + if (applied === 0) return {}; + + const history = pushSnapshot(state); return { ...history, nodes, edges, - isDirty: applied > 0 ? true : state.isDirty, + isDirty: true, }; }); From 8e6cabd853a1197ef199ed03de6f8e41a5184972 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 00:56:08 +0000 Subject: [PATCH 23/25] fix: resolve 3 AI conversation state sync bugs - Await prisma.aiMessage.create before sending done:true SSE event, eliminating the race where client refetch returns stale data - Replace temp message IDs with real server-persisted IDs by refetching after streaming completes, so markSuggestionsApplied finds the row - Guard markSuggestionsApplied against temp- prefixed IDs as a safety net - Clear TanStack Query cache in startNewConversation to prevent the sync guard from repopulating the old conversation from cache --- src/app/api/ai/pipeline/route.ts | 26 +++++++++++++++----------- src/hooks/use-ai-conversation.ts | 28 +++++++++++++++++++++------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/src/app/api/ai/pipeline/route.ts b/src/app/api/ai/pipeline/route.ts index 9441994..574d45a 100644 --- a/src/app/api/ai/pipeline/route.ts +++ b/src/app/api/ai/pipeline/route.ts @@ -181,8 +181,6 @@ export async function POST(request: Request) { }, signal: request.signal, }); - controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)); - if (body.mode === "review" && conversationId) { let parsedSuggestions = null; try { @@ -194,15 +192,19 @@ export async function POST(request: Request) { // Not valid JSON — store as raw text } - prisma.aiMessage.create({ - data: { - conversationId, - role: "assistant", - content: fullResponse, - suggestions: (parsedSuggestions as unknown as Prisma.InputJsonValue) ?? undefined, - createdById: session.user.id, - }, - }).catch((err) => console.error("Failed to persist AI response:", err)); + try { + await prisma.aiMessage.create({ + data: { + conversationId, + role: "assistant", + content: fullResponse, + suggestions: (parsedSuggestions as unknown as Prisma.InputJsonValue) ?? undefined, + createdById: session.user.id, + }, + }); + } catch (err) { + console.error("Failed to persist AI response:", err); + } const pipelineForAudit = await prisma.pipeline.findUnique({ where: { id: body.pipelineId! }, @@ -225,6 +227,8 @@ export async function POST(request: Request) { userName: session.user.name ?? null, }).catch(() => {}); } + + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)); } catch (err) { const message = err instanceof Error ? err.message : "AI request failed"; controller.enqueue( diff --git a/src/hooks/use-ai-conversation.ts b/src/hooks/use-ai-conversation.ts index f3bfc37..7db8953 100644 --- a/src/hooks/use-ai-conversation.ts +++ b/src/hooks/use-ai-conversation.ts @@ -164,8 +164,24 @@ export function useAiConversation({ setMessages((prev) => [...prev, assistantMessage]); setStreamingContent(""); - // Invalidate to sync with server-persisted messages - queryClient.invalidateQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) }); + // Refetch to sync local state with server-persisted messages (real IDs) + const refetched = await queryClient.fetchQuery({ + ...trpc.ai.getConversation.queryOptions({ pipelineId }), + staleTime: 0, + }); + if (refetched?.messages) { + setMessages( + refetched.messages.map((m) => ({ + id: m.id, + role: m.role as "user" | "assistant", + content: m.content, + suggestions: m.suggestions as unknown as AiSuggestion[] | undefined, + pipelineYaml: m.pipelineYaml, + createdAt: m.createdAt instanceof Date ? m.createdAt.toISOString() : String(m.createdAt), + createdBy: m.createdBy, + })), + ); + } } catch (err) { if (err instanceof Error && err.name === "AbortError") return; setError(err instanceof Error ? err.message : "AI request failed"); @@ -178,19 +194,17 @@ export function useAiConversation({ ); const startNewConversation = useCallback(() => { + queryClient.removeQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) }); setMessages([]); setConversationId(null); setStreamingContent(""); setError(null); - }, []); + }, [queryClient, trpc, pipelineId]); const markSuggestionsApplied = useCallback( (messageId: string, suggestionIds: string[]) => { if (!conversationId) return; - - // The server records which suggestions were applied in the JSON; - // we rely on the query invalidation in markAppliedMutation.onSuccess - // to refresh the conversation state. + if (messageId.startsWith("temp-")) return; // Wait for server-persisted IDs markAppliedMutation.mutate({ pipelineId, From 26ec9b9516f3bc2ed45a89ec34b72069135f6f18 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 09:48:10 +0000 Subject: [PATCH 24/25] fix: new conversation refetch loop and dot-notation config paths - Add isNewConversationRef to block the render-time sync guard from repopulating old messages after React Query background refetches - Resolve dot-notation keys in modify_config changes (e.g. "codec.fields") into proper nested objects instead of creating literal flat keys --- src/hooks/use-ai-conversation.ts | 5 ++++- src/lib/ai/suggestion-applier.ts | 15 ++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/hooks/use-ai-conversation.ts b/src/hooks/use-ai-conversation.ts index 7db8953..3ad28a2 100644 --- a/src/hooks/use-ai-conversation.ts +++ b/src/hooks/use-ai-conversation.ts @@ -38,6 +38,7 @@ export function useAiConversation({ const [streamingContent, setStreamingContent] = useState(""); const [error, setError] = useState(null); const abortRef = useRef(null); + const isNewConversationRef = useRef(false); // Load existing conversation const conversationQuery = useQuery({ @@ -47,7 +48,7 @@ export function useAiConversation({ // Sync loaded conversation into local state const loadedConversation = conversationQuery.data; - if (loadedConversation && !conversationId && messages.length === 0 && !isStreaming) { + if (loadedConversation && !conversationId && messages.length === 0 && !isStreaming && !isNewConversationRef.current) { setConversationId(loadedConversation.id); setMessages( loadedConversation.messages.map((m) => ({ @@ -74,6 +75,7 @@ export function useAiConversation({ async (prompt: string) => { if (!prompt.trim() || !selectedTeamId || isStreaming) return; + isNewConversationRef.current = false; setIsStreaming(true); setStreamingContent(""); setError(null); @@ -194,6 +196,7 @@ export function useAiConversation({ ); const startNewConversation = useCallback(() => { + isNewConversationRef.current = true; queryClient.removeQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) }); setMessages([]); setConversationId(null); diff --git a/src/lib/ai/suggestion-applier.ts b/src/lib/ai/suggestion-applier.ts index 7e80e72..efd82a0 100644 --- a/src/lib/ai/suggestion-applier.ts +++ b/src/lib/ai/suggestion-applier.ts @@ -38,6 +38,16 @@ function findNodeByComponentKey(nodes: Node[], componentKey: string): Node | und return nodes.find((n) => (n.data as Record).componentKey === componentKey); } +/** Deep-set a value at a dot-notation path, returning a shallow-cloned object tree. */ +function setAtPath(obj: Record, path: string, value: unknown): Record { + if (!path.includes(".")) { + return { ...obj, [path]: value }; + } + const [head, ...rest] = path.split("."); + const child = (obj[head] ?? {}) as Record; + return { ...obj, [head]: setAtPath(child, rest.join("."), value) }; +} + function applyModifyConfig( suggestion: AiSuggestion & { type: "modify_config" }, nodes: Node[], @@ -49,7 +59,10 @@ function applyModifyConfig( } const existingConfig = (target.data as Record).config as Record; - const newConfig = { ...existingConfig, ...suggestion.changes }; + let newConfig = { ...existingConfig }; + for (const [key, value] of Object.entries(suggestion.changes)) { + newConfig = setAtPath(newConfig, key, value); + } const newNodes = nodes.map((n) => n.id === target.id From 45d85c22108bcc1809f2fe311c616a93a63081f8 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 10:04:04 +0000 Subject: [PATCH 25/25] fix: wrap markSuggestionsApplied in transaction to prevent lost updates The read-modify-write on the JSONB suggestions column was not atomic, so concurrent team members marking different suggestions could silently overwrite each other's appliedAt markers. --- src/server/routers/ai.ts | 52 +++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/src/server/routers/ai.ts b/src/server/routers/ai.ts index 3b3fd6c..f7a4d9e 100644 --- a/src/server/routers/ai.ts +++ b/src/server/routers/ai.ts @@ -51,34 +51,36 @@ export const aiRouter = router({ .use(withTeamAccess("EDITOR")) .use(withAudit("pipeline.ai_suggestion_applied", "Pipeline")) .mutation(async ({ input, ctx }) => { - const message = await prisma.aiMessage.findUnique({ - where: { id: input.messageId }, - include: { - conversation: { select: { pipelineId: true } }, - }, - }); + return prisma.$transaction(async (tx) => { + const message = await tx.aiMessage.findUnique({ + where: { id: input.messageId }, + include: { + conversation: { select: { pipelineId: true } }, + }, + }); - if ( - !message || - message.conversationId !== input.conversationId || - message.conversation.pipelineId !== input.pipelineId - ) { - throw new TRPCError({ code: "NOT_FOUND", message: "Message not found in conversation" }); - } + if ( + !message || + message.conversationId !== input.conversationId || + message.conversation.pipelineId !== input.pipelineId + ) { + throw new TRPCError({ code: "NOT_FOUND", message: "Message not found in conversation" }); + } - // Mark suggestions as applied in the JSON - const suggestions = (message.suggestions as Array>) ?? []; - const updatedSuggestions = suggestions.map((s) => - input.suggestionIds.includes(s.id as string) - ? { ...s, appliedAt: new Date().toISOString(), appliedById: ctx.session.user.id } - : s, - ); + // Mark suggestions as applied in the JSON + const suggestions = (message.suggestions as Array>) ?? []; + const updatedSuggestions = suggestions.map((s) => + input.suggestionIds.includes(s.id as string) + ? { ...s, appliedAt: new Date().toISOString(), appliedById: ctx.session.user.id } + : s, + ); - await prisma.aiMessage.update({ - where: { id: input.messageId }, - data: { suggestions: updatedSuggestions as unknown as Prisma.InputJsonValue }, - }); + await tx.aiMessage.update({ + where: { id: input.messageId }, + data: { suggestions: updatedSuggestions as unknown as Prisma.InputJsonValue }, + }); - return { applied: input.suggestionIds.length }; + return { applied: input.suggestionIds.length }; + }); }), });