-
-
- {result || (
-
-
- {mode === "generate" ? "Generating pipeline..." : "Reviewing pipeline..."}
-
- )}
-
- {!isStreaming && result && (
-
- {mode === "generate" && (
-
- )}
-
+ {/* ---- Review tab (conversation thread) ---- */}
+
+ {conversation.isLoading ? (
+
+
+ ) : (
+ <>
+ {/* Message thread */}
+
+
+ {conversation.messages.length === 0 && !conversation.isStreaming && (
+
+ Ask the AI to review your pipeline configuration.
+
+ )}
+
+ {conversation.messages.map((msg) => (
+
+ ))}
+
+ {conversation.isStreaming && conversation.streamingContent && (
+
+
+
+
+
+
+ {conversation.streamingContent}
+
+
+
+ )}
+
+ {conversation.isStreaming && !conversation.streamingContent && (
+
+
+ Analyzing pipeline...
+
+ )}
+
+
+
+
+
+ {conversation.error && (
+
+
+ {conversation.error}
+
+ )}
+
+ {/* Input pinned at bottom */}
+
+
+ {conversation.messages.length > 0 && (
+
+ )}
+
+ >
)}
-
- )}
+
+
);
diff --git a/src/components/flow/ai-suggestion-card.tsx b/src/components/flow/ai-suggestion-card.tsx
new file mode 100644
index 00000000..10a9e008
--- /dev/null
+++ b/src/components/flow/ai-suggestion-card.tsx
@@ -0,0 +1,150 @@
+"use client";
+
+import { Checkbox } from "@/components/ui/checkbox";
+import { Badge } from "@/components/ui/badge";
+import { cn } from "@/lib/utils";
+import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types";
+import { AlertTriangle } from "lucide-react";
+
+interface AiSuggestionCardProps {
+ suggestion: AiSuggestion;
+ status: SuggestionStatus;
+ isSelected: boolean;
+ hasConflict: boolean;
+ conflictReason?: string;
+ onToggle: (id: string) => void;
+}
+
+const TYPE_LABELS: Record
= {
+ modify_config: "Config Change",
+ add_component: "Add Component",
+ remove_component: "Remove Component",
+ modify_connections: "Rewire",
+};
+
+const PRIORITY_COLORS: Record = {
+ high: "bg-red-500/15 text-red-700 dark:text-red-400",
+ medium: "bg-amber-500/15 text-amber-700 dark:text-amber-400",
+ low: "bg-green-500/15 text-green-700 dark:text-green-400",
+};
+
+const STATUS_BADGES: Partial> = {
+ applied: { label: "Applied", className: "bg-green-500/15 text-green-700 dark:text-green-400" },
+ outdated: { label: "Outdated", className: "bg-amber-500/15 text-amber-700 dark:text-amber-400" },
+ invalid: { label: "Invalid", className: "bg-red-500/15 text-red-700 dark:text-red-400" },
+};
+
+export function AiSuggestionCard({
+ suggestion,
+ status,
+ isSelected,
+ hasConflict,
+ conflictReason,
+ onToggle,
+}: AiSuggestionCardProps) {
+ const isDisabled = status === "applied" || status === "invalid";
+ const statusBadge = STATUS_BADGES[status];
+
+ return (
+
+
+
onToggle(suggestion.id)}
+ className="mt-0.5"
+ />
+
+
+
+
+ {suggestion.title}
+
+
+ {statusBadge && (
+
+ {statusBadge.label}
+
+ )}
+
+
+ {suggestion.priority}
+
+
+
+ {TYPE_LABELS[suggestion.type]}
+
+
+
+
+ {renderDescription(suggestion)}
+
+
+ {suggestion.type === "modify_config" && (
+
+ {Object.entries(suggestion.changes).map(([key, value]) => (
+
+ {key}:{" "}
+ {JSON.stringify(value)}
+
+ ))}
+
+ )}
+
+ {hasConflict && conflictReason && (
+
+ )}
+
+
+
+ );
+}
+
+function renderDescription(suggestion: AiSuggestion): React.ReactNode {
+ const desc = suggestion.description;
+
+ // Highlight componentKey references in the description
+ const componentKeys: string[] = [];
+ if (suggestion.type === "modify_config" || suggestion.type === "remove_component") {
+ componentKeys.push(suggestion.componentKey);
+ }
+ if (suggestion.type === "add_component") {
+ componentKeys.push(suggestion.insertAfter, ...suggestion.connectTo);
+ }
+ if (suggestion.type === "modify_connections") {
+ for (const e of suggestion.edgeChanges) {
+ componentKeys.push(e.from, e.to);
+ }
+ }
+
+ if (componentKeys.length === 0) return desc;
+
+ const uniqueKeys = [...new Set(componentKeys)];
+ const pattern = new RegExp(`(${uniqueKeys.map(k => k.replace(/[.*+?^${}()|[\]\\]/g, "\\$&")).join("|")})`, "g");
+ const parts = desc.split(pattern);
+
+ return parts.map((part, i) =>
+ uniqueKeys.includes(part) ? (
+
+ {part}
+
+ ) : (
+ part
+ ),
+ );
+}
diff --git a/src/components/ui/checkbox.tsx b/src/components/ui/checkbox.tsx
new file mode 100644
index 00000000..f5a7e433
--- /dev/null
+++ b/src/components/ui/checkbox.tsx
@@ -0,0 +1,32 @@
+"use client"
+
+import * as React from "react"
+import { CheckIcon } from "lucide-react"
+import { Checkbox as CheckboxPrimitive } from "radix-ui"
+
+import { cn } from "@/lib/utils"
+
+function Checkbox({
+ className,
+ ...props
+}: React.ComponentProps) {
+ return (
+
+
+
+
+
+ )
+}
+
+export { Checkbox }
diff --git a/src/hooks/use-ai-conversation.ts b/src/hooks/use-ai-conversation.ts
new file mode 100644
index 00000000..3ad28a2c
--- /dev/null
+++ b/src/hooks/use-ai-conversation.ts
@@ -0,0 +1,238 @@
+"use client";
+
+import { useState, useRef, useCallback } from "react";
+import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query";
+import { useTRPC } from "@/trpc/client";
+import { useTeamStore } from "@/stores/team-store";
+import type { AiSuggestion } from "@/lib/ai/types";
+import { parseAiReviewResponse } from "@/lib/ai/suggestion-validator";
+
+export interface ConversationMessage {
+ id: string;
+ role: "user" | "assistant";
+ content: string;
+ suggestions?: AiSuggestion[];
+ pipelineYaml?: string | null;
+ createdAt: string;
+ createdBy?: { id: string; name: string | null; image: string | null } | null;
+}
+
+interface UseAiConversationOptions {
+ pipelineId: string;
+ currentYaml?: string;
+ environmentName?: string;
+}
+
+export function useAiConversation({
+ pipelineId,
+ currentYaml,
+ environmentName,
+}: UseAiConversationOptions) {
+ const trpc = useTRPC();
+ const queryClient = useQueryClient();
+ const selectedTeamId = useTeamStore((s) => s.selectedTeamId);
+
+ const [messages, setMessages] = useState([]);
+ const [conversationId, setConversationId] = useState(null);
+ const [isStreaming, setIsStreaming] = useState(false);
+ const [streamingContent, setStreamingContent] = useState("");
+ const [error, setError] = useState(null);
+ const abortRef = useRef(null);
+ const isNewConversationRef = useRef(false);
+
+ // Load existing conversation
+ const conversationQuery = useQuery({
+ ...trpc.ai.getConversation.queryOptions({ pipelineId }),
+ enabled: !!pipelineId,
+ });
+
+ // Sync loaded conversation into local state
+ const loadedConversation = conversationQuery.data;
+ if (loadedConversation && !conversationId && messages.length === 0 && !isStreaming && !isNewConversationRef.current) {
+ setConversationId(loadedConversation.id);
+ setMessages(
+ loadedConversation.messages.map((m) => ({
+ id: m.id,
+ role: m.role as "user" | "assistant",
+ content: m.content,
+ suggestions: m.suggestions as unknown as AiSuggestion[] | undefined,
+ pipelineYaml: m.pipelineYaml,
+ createdAt: m.createdAt instanceof Date ? m.createdAt.toISOString() : String(m.createdAt),
+ createdBy: m.createdBy,
+ })),
+ );
+ }
+
+ const markAppliedMutation = useMutation(
+ trpc.ai.markSuggestionsApplied.mutationOptions({
+ onSuccess: () => {
+ queryClient.invalidateQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) });
+ },
+ }),
+ );
+
+ const sendReview = useCallback(
+ async (prompt: string) => {
+ if (!prompt.trim() || !selectedTeamId || isStreaming) return;
+
+ isNewConversationRef.current = false;
+ setIsStreaming(true);
+ setStreamingContent("");
+ setError(null);
+
+ // Add optimistic user message
+ const userMessage: ConversationMessage = {
+ id: `temp-user-${Date.now()}`,
+ role: "user",
+ content: prompt.trim(),
+ pipelineYaml: currentYaml ?? null,
+ createdAt: new Date().toISOString(),
+ };
+ setMessages((prev) => [...prev, userMessage]);
+
+ abortRef.current = new AbortController();
+ let fullResponse = "";
+
+ try {
+ const response = await fetch("/api/ai/pipeline", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ teamId: selectedTeamId,
+ prompt: prompt.trim(),
+ mode: "review",
+ currentYaml,
+ environmentName,
+ pipelineId,
+ conversationId,
+ }),
+ signal: abortRef.current.signal,
+ });
+
+ if (!response.ok) {
+ const errData = await response.json().catch(() => ({ error: "Request failed" }));
+ throw new Error(errData.error || `HTTP ${response.status}`);
+ }
+
+ const reader = response.body?.getReader();
+ if (!reader) throw new Error("No response stream");
+
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+
+ buffer += decoder.decode(value, { stream: true });
+ const lines = buffer.split("\n");
+ buffer = lines.pop() ?? "";
+
+ for (const line of lines) {
+ const trimmed = line.trim();
+ if (!trimmed || !trimmed.startsWith("data: ")) continue;
+
+ try {
+ const data = JSON.parse(trimmed.slice(6));
+ if (data.conversationId) {
+ setConversationId(data.conversationId);
+ continue;
+ }
+ if (data.done) break;
+ if (data.error) throw new Error(data.error);
+ if (data.token) {
+ fullResponse += data.token;
+ setStreamingContent(fullResponse);
+ }
+ } catch (parseErr) {
+ if (parseErr instanceof Error && parseErr.message !== "Unexpected end of JSON input") {
+ throw parseErr;
+ }
+ }
+ }
+ }
+
+ // Parse the completed response
+ const parsed = parseAiReviewResponse(fullResponse);
+
+ const assistantMessage: ConversationMessage = {
+ id: `temp-assistant-${Date.now()}`,
+ role: "assistant",
+ content: fullResponse,
+ suggestions: parsed?.suggestions,
+ pipelineYaml: currentYaml ?? null,
+ createdAt: new Date().toISOString(),
+ };
+ setMessages((prev) => [...prev, assistantMessage]);
+ setStreamingContent("");
+
+ // Refetch to sync local state with server-persisted messages (real IDs)
+ const refetched = await queryClient.fetchQuery({
+ ...trpc.ai.getConversation.queryOptions({ pipelineId }),
+ staleTime: 0,
+ });
+ if (refetched?.messages) {
+ setMessages(
+ refetched.messages.map((m) => ({
+ id: m.id,
+ role: m.role as "user" | "assistant",
+ content: m.content,
+ suggestions: m.suggestions as unknown as AiSuggestion[] | undefined,
+ pipelineYaml: m.pipelineYaml,
+ createdAt: m.createdAt instanceof Date ? m.createdAt.toISOString() : String(m.createdAt),
+ createdBy: m.createdBy,
+ })),
+ );
+ }
+ } catch (err) {
+ if (err instanceof Error && err.name === "AbortError") return;
+ setError(err instanceof Error ? err.message : "AI request failed");
+ } finally {
+ setIsStreaming(false);
+ abortRef.current = null;
+ }
+ },
+ [selectedTeamId, isStreaming, currentYaml, environmentName, pipelineId, conversationId, queryClient, trpc],
+ );
+
+ const startNewConversation = useCallback(() => {
+ isNewConversationRef.current = true;
+ queryClient.removeQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) });
+ setMessages([]);
+ setConversationId(null);
+ setStreamingContent("");
+ setError(null);
+ }, [queryClient, trpc, pipelineId]);
+
+ const markSuggestionsApplied = useCallback(
+ (messageId: string, suggestionIds: string[]) => {
+ if (!conversationId) return;
+ if (messageId.startsWith("temp-")) return; // Wait for server-persisted IDs
+
+ markAppliedMutation.mutate({
+ pipelineId,
+ conversationId,
+ messageId,
+ suggestionIds,
+ });
+ },
+ [conversationId, pipelineId, markAppliedMutation],
+ );
+
+ const cancelStreaming = useCallback(() => {
+ abortRef.current?.abort();
+ }, []);
+
+ return {
+ messages,
+ conversationId,
+ isStreaming,
+ streamingContent,
+ error,
+ isLoading: conversationQuery.isLoading,
+ sendReview,
+ startNewConversation,
+ markSuggestionsApplied,
+ cancelStreaming,
+ };
+}
diff --git a/src/lib/ai/conflict-detector.ts b/src/lib/ai/conflict-detector.ts
new file mode 100644
index 00000000..ab2611f3
--- /dev/null
+++ b/src/lib/ai/conflict-detector.ts
@@ -0,0 +1,107 @@
+// src/lib/ai/conflict-detector.ts
+import type { AiSuggestion } from "./types";
+
+export interface ConflictPair {
+ a: string;
+ b: string;
+ reason: string;
+}
+
+/**
+ * Detect conflicts between selected suggestions.
+ * Returns pairs of conflicting suggestion IDs with reasons.
+ */
+export function detectConflicts(suggestions: AiSuggestion[]): ConflictPair[] {
+ const conflicts: ConflictPair[] = [];
+
+ for (let i = 0; i < suggestions.length; i++) {
+ for (let j = i + 1; j < suggestions.length; j++) {
+ const conflict = checkPairConflict(suggestions[i], suggestions[j]);
+ if (conflict) {
+ conflicts.push({ a: suggestions[i].id, b: suggestions[j].id, reason: conflict });
+ }
+ }
+ }
+
+ return conflicts;
+}
+
+function checkPairConflict(a: AiSuggestion, b: AiSuggestion): string | null {
+ // Same-type: two modify_config on same component with overlapping keys
+ if (a.type === "modify_config" && b.type === "modify_config") {
+ if (a.componentKey === b.componentKey) {
+ const keysA = Object.keys(a.changes);
+ const keysB = Object.keys(b.changes);
+ const overlap = keysA.filter((k) => keysB.includes(k));
+ if (overlap.length > 0) {
+ return `Both modify "${a.componentKey}" config keys: ${overlap.join(", ")}`;
+ }
+ }
+ }
+
+ // Same-type: contradicting modify_connections
+ if (a.type === "modify_connections" && b.type === "modify_connections") {
+ for (const ea of a.edgeChanges) {
+ for (const eb of b.edgeChanges) {
+ if (ea.from === eb.from && ea.to === eb.to) {
+ if (ea.action !== eb.action) {
+ return `Contradicting edge changes: ${ea.from} to ${ea.to}`;
+ }
+ if (ea.action === "add" && eb.action === "add") {
+ return `Duplicate edge addition: ${ea.from} to ${ea.to}`;
+ }
+ }
+ }
+ }
+ }
+
+ // Cross-type: remove_component vs anything referencing that component
+ if (a.type === "remove_component" || b.type === "remove_component") {
+ const remover = (a.type === "remove_component" ? a : b) as AiSuggestion & { type: "remove_component" };
+ const other = a.type === "remove_component" ? b : a;
+ const removedKey = remover.componentKey;
+ const referencedKeys = getReferencedComponentKeys(other);
+ if (referencedKeys.has(removedKey)) {
+ return `"${removedKey}" is removed by one suggestion but referenced by another`;
+ }
+ }
+
+ // Cross-type: add_component connectTo vs modify_connections removing same edge
+ if (
+ (a.type === "add_component" && b.type === "modify_connections") ||
+ (a.type === "modify_connections" && b.type === "add_component")
+ ) {
+ const adder = a.type === "add_component" ? a : b as AiSuggestion & { type: "add_component" };
+ const modifier = a.type === "modify_connections" ? a : b as AiSuggestion & { type: "modify_connections" };
+ for (const edge of modifier.edgeChanges) {
+ if (edge.action === "remove" && adder.connectTo.includes(edge.to)) {
+ return `add_component connects to "${edge.to}" but modify_connections removes an edge to it`;
+ }
+ }
+ }
+
+ return null;
+}
+
+function getReferencedComponentKeys(s: AiSuggestion): Set {
+ const keys = new Set();
+ switch (s.type) {
+ case "modify_config":
+ keys.add(s.componentKey);
+ break;
+ case "add_component":
+ keys.add(s.insertAfter);
+ for (const k of s.connectTo) keys.add(k);
+ break;
+ case "remove_component":
+ keys.add(s.componentKey);
+ break;
+ case "modify_connections":
+ for (const e of s.edgeChanges) {
+ keys.add(e.from);
+ keys.add(e.to);
+ }
+ break;
+ }
+ return keys;
+}
diff --git a/src/lib/ai/prompts.ts b/src/lib/ai/prompts.ts
index bfe5cb92..66524141 100644
--- a/src/lib/ai/prompts.ts
+++ b/src/lib/ai/prompts.ts
@@ -61,10 +61,37 @@ export function buildPipelineSystemPrompt(context: {
} else {
parts.push(
"You are a Vector pipeline configuration reviewer.",
- "Analyze the provided Vector pipeline YAML and provide improvement suggestions.",
- "Focus on: performance, correctness, best practices, and potential issues.",
- "If the user asks for a revised config, output the complete corrected YAML with no markdown fencing.",
- "Otherwise, provide suggestions as concise text.",
+ "Analyze the provided Vector pipeline YAML and return your response as a JSON object.",
+ "",
+ "Response format (return ONLY this JSON, no markdown fencing, no extra text):",
+ JSON.stringify({
+ summary: "2-3 sentence analysis of the pipeline",
+ suggestions: [
+ {
+ id: "s1",
+ type: "modify_config",
+ title: "Short title",
+ description: "Why this helps",
+ priority: "high|medium|low",
+ componentKey: "existing_component_key",
+ changes: { "config.field": "new_value" },
+ },
+ ],
+ }, null, 2),
+ "",
+ "Suggestion types:",
+ '- modify_config: { type: "modify_config", componentKey, changes: { field: value } }',
+ '- add_component: { type: "add_component", component: { key, componentType, kind: "source"|"transform"|"sink", config }, insertAfter: "existing_key", connectTo: ["downstream_key"] }',
+ '- remove_component: { type: "remove_component", componentKey, reconnect: true|false }',
+ '- modify_connections: { type: "modify_connections", edgeChanges: [{ action: "add"|"remove", from: "key", to: "key" }] }',
+ "",
+ "Rules:",
+ "- Each suggestion needs a unique id (s1, s2, s3...)",
+ "- componentKey values MUST match real keys from the provided YAML",
+ "- Focus on: performance, correctness, best practices, potential issues",
+ "- Prioritize: high = likely bug or major perf issue, medium = optimization, low = cleanup",
+ "- Return valid JSON only. No markdown, no code fences, no commentary outside the JSON.",
+ "- Even in follow-up messages, always return the full JSON object. Never mix prose with JSON.",
);
}
diff --git a/src/lib/ai/suggestion-applier.ts b/src/lib/ai/suggestion-applier.ts
new file mode 100644
index 00000000..efd82a0f
--- /dev/null
+++ b/src/lib/ai/suggestion-applier.ts
@@ -0,0 +1,203 @@
+// src/lib/ai/suggestion-applier.ts
+import type { Node, Edge } from "@xyflow/react";
+import type { AiSuggestion } from "./types";
+import { findComponentDef } from "@/lib/vector/catalog";
+import { generateComponentKey } from "@/lib/component-key";
+import { generateId } from "@/lib/utils";
+
+interface ApplyResult {
+ nodes: Node[];
+ edges: Edge[];
+ error?: string;
+}
+
+/**
+ * Apply a single suggestion to the current flow state.
+ * Returns new nodes/edges arrays (immutable).
+ */
+export function applySuggestion(
+ suggestion: AiSuggestion,
+ nodes: Node[],
+ edges: Edge[],
+): ApplyResult {
+ switch (suggestion.type) {
+ case "modify_config":
+ return applyModifyConfig(suggestion, nodes, edges);
+ case "add_component":
+ return applyAddComponent(suggestion, nodes, edges);
+ case "remove_component":
+ return applyRemoveComponent(suggestion, nodes, edges);
+ case "modify_connections":
+ return applyModifyConnections(suggestion, nodes, edges);
+ default:
+ return { nodes, edges, error: "Unknown suggestion type" };
+ }
+}
+
+function findNodeByComponentKey(nodes: Node[], componentKey: string): Node | undefined {
+ return nodes.find((n) => (n.data as Record).componentKey === componentKey);
+}
+
+/** Deep-set a value at a dot-notation path, returning a shallow-cloned object tree. */
+function setAtPath(obj: Record, path: string, value: unknown): Record {
+ if (!path.includes(".")) {
+ return { ...obj, [path]: value };
+ }
+ const [head, ...rest] = path.split(".");
+ const child = (obj[head] ?? {}) as Record;
+ return { ...obj, [head]: setAtPath(child, rest.join("."), value) };
+}
+
+function applyModifyConfig(
+ suggestion: AiSuggestion & { type: "modify_config" },
+ nodes: Node[],
+ edges: Edge[],
+): ApplyResult {
+ const target = findNodeByComponentKey(nodes, suggestion.componentKey);
+ if (!target) {
+ return { nodes, edges, error: `Component "${suggestion.componentKey}" not found` };
+ }
+
+ const existingConfig = (target.data as Record).config as Record;
+ let newConfig = { ...existingConfig };
+ for (const [key, value] of Object.entries(suggestion.changes)) {
+ newConfig = setAtPath(newConfig, key, value);
+ }
+
+ const newNodes = nodes.map((n) =>
+ n.id === target.id
+ ? { ...n, data: { ...n.data, config: newConfig } }
+ : n,
+ );
+
+ return { nodes: newNodes, edges };
+}
+
+function applyAddComponent(
+ suggestion: AiSuggestion & { type: "add_component" },
+ nodes: Node[],
+ edges: Edge[],
+): ApplyResult {
+ const { component, insertAfter, connectTo } = suggestion;
+
+ const componentDef = findComponentDef(component.componentType, component.kind);
+ if (!componentDef) {
+ return { nodes, edges, error: `Unknown component type "${component.componentType}"` };
+ }
+
+ const afterNode = findNodeByComponentKey(nodes, insertAfter);
+ if (!afterNode) {
+ return { nodes, edges, error: `Component "${insertAfter}" not found for insertAfter` };
+ }
+
+ const position = {
+ x: afterNode.position.x,
+ y: afterNode.position.y + 150,
+ };
+
+ const newNodeId = generateId();
+ const newComponentKey = generateComponentKey(component.componentType);
+
+ const newNode: Node = {
+ id: newNodeId,
+ type: component.kind,
+ position,
+ data: {
+ componentDef,
+ componentKey: newComponentKey,
+ displayName: componentDef.displayName,
+ config: component.config,
+ },
+ };
+
+ let newEdges = [...edges];
+
+ // Add edge: afterNode to newNode (once, regardless of connectTo count)
+ newEdges.push({ id: generateId(), source: afterNode.id, target: newNodeId });
+
+ for (const downstreamKey of connectTo) {
+ const downstreamNode = findNodeByComponentKey(nodes, downstreamKey);
+ if (!downstreamNode) continue;
+
+ // Remove existing edge from afterNode to downstream
+ newEdges = newEdges.filter(
+ (e) => !(e.source === afterNode.id && e.target === downstreamNode.id),
+ );
+
+ // Add edge: newNode to downstream
+ newEdges.push({ id: generateId(), source: newNodeId, target: downstreamNode.id });
+ }
+
+ return { nodes: [...nodes, newNode], edges: newEdges };
+}
+
+function applyRemoveComponent(
+ suggestion: AiSuggestion & { type: "remove_component" },
+ nodes: Node[],
+ edges: Edge[],
+): ApplyResult {
+ const target = findNodeByComponentKey(nodes, suggestion.componentKey);
+ if (!target) {
+ return { nodes, edges, error: `Component "${suggestion.componentKey}" not found` };
+ }
+
+ if ((target.data as Record).isSystemLocked) {
+ return { nodes, edges, error: `Component "${suggestion.componentKey}" is system-locked` };
+ }
+
+ const incomingEdges = edges.filter((e) => e.target === target.id);
+ const outgoingEdges = edges.filter((e) => e.source === target.id);
+
+ const newEdges = edges.filter((e) => e.source !== target.id && e.target !== target.id);
+ const newNodes = nodes.filter((n) => n.id !== target.id);
+
+ if (suggestion.reconnect) {
+ for (const incoming of incomingEdges) {
+ for (const outgoing of outgoingEdges) {
+ newEdges.push({
+ id: generateId(),
+ source: incoming.source,
+ target: outgoing.target,
+ });
+ }
+ }
+ }
+
+ return { nodes: newNodes, edges: newEdges };
+}
+
+function applyModifyConnections(
+ suggestion: AiSuggestion & { type: "modify_connections" },
+ nodes: Node[],
+ edges: Edge[],
+): ApplyResult {
+ let newEdges = [...edges];
+
+ for (const change of suggestion.edgeChanges) {
+ const fromNode = findNodeByComponentKey(nodes, change.from);
+ const toNode = findNodeByComponentKey(nodes, change.to);
+
+ if (!fromNode || !toNode) {
+ return {
+ nodes,
+ edges,
+ error: `Component "${!fromNode ? change.from : change.to}" not found`,
+ };
+ }
+
+ if (change.action === "add") {
+ const exists = newEdges.some(
+ (e) => e.source === fromNode.id && e.target === toNode.id,
+ );
+ if (!exists) {
+ newEdges.push({ id: generateId(), source: fromNode.id, target: toNode.id });
+ }
+ } else {
+ newEdges = newEdges.filter(
+ (e) => !(e.source === fromNode.id && e.target === toNode.id),
+ );
+ }
+ }
+
+ return { nodes, edges: newEdges };
+}
diff --git a/src/lib/ai/suggestion-validator.ts b/src/lib/ai/suggestion-validator.ts
new file mode 100644
index 00000000..2fc05979
--- /dev/null
+++ b/src/lib/ai/suggestion-validator.ts
@@ -0,0 +1,101 @@
+// src/lib/ai/suggestion-validator.ts
+import type { Node } from "@xyflow/react";
+import type { AiSuggestion, AiReviewResponse, SuggestionStatus } from "./types";
+
+/**
+ * Validate a parsed AI response. Returns the response if valid, null if not.
+ */
+export function parseAiReviewResponse(raw: string): AiReviewResponse | null {
+ try {
+ const parsed = JSON.parse(raw);
+ if (
+ typeof parsed === "object" &&
+ parsed !== null &&
+ typeof parsed.summary === "string" &&
+ Array.isArray(parsed.suggestions)
+ ) {
+ return parsed as AiReviewResponse;
+ }
+ return null;
+ } catch {
+ return null;
+ }
+}
+
+/**
+ * Validate that suggestion references exist on the canvas.
+ */
+export function validateSuggestions(
+ suggestions: AiSuggestion[],
+ nodes: Node[],
+): Map {
+ const componentKeys = new Set(
+ nodes.map((n) => (n.data as Record).componentKey as string),
+ );
+
+ const statuses = new Map();
+
+ for (const s of suggestions) {
+ const referencedKeys = getReferencedKeys(s);
+ const allValid = referencedKeys.every((k) => componentKeys.has(k));
+ statuses.set(s.id, allValid ? "actionable" : "invalid");
+ }
+
+ return statuses;
+}
+
+/**
+ * Determine which suggestions are outdated by comparing pipeline YAML snapshots.
+ */
+export function detectOutdatedSuggestions(
+ suggestions: AiSuggestion[],
+ snapshotYaml: string | null,
+ currentYaml: string,
+): Set {
+ if (!snapshotYaml || snapshotYaml === currentYaml) {
+ return new Set();
+ }
+
+ const outdated = new Set();
+
+ for (const s of suggestions) {
+ const keys = getReferencedKeys(s);
+ for (const key of keys) {
+ const snapshotBlock = extractComponentBlock(snapshotYaml, key);
+ const currentBlock = extractComponentBlock(currentYaml, key);
+ if (snapshotBlock !== currentBlock) {
+ outdated.add(s.id);
+ break;
+ }
+ }
+ }
+
+ return outdated;
+}
+
+function getReferencedKeys(s: AiSuggestion): string[] {
+ switch (s.type) {
+ case "modify_config":
+ return [s.componentKey];
+ case "add_component":
+ return [s.insertAfter, ...s.connectTo];
+ case "remove_component":
+ return [s.componentKey];
+ case "modify_connections":
+ return s.edgeChanges.flatMap((e) => [e.from, e.to]);
+ }
+}
+
+function extractComponentBlock(yaml: string, componentKey: string): string | null {
+ const escaped = componentKey.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
+ const regex = new RegExp(`^ ${escaped}:\\s*$`, "m");
+ const match = regex.exec(yaml);
+ if (!match) return null;
+
+ const start = match.index;
+ const rest = yaml.slice(start + match[0].length);
+ const nextKey = rest.search(/^\s{2}\S/m);
+ const end = nextKey === -1 ? yaml.length : start + match[0].length + nextKey;
+
+ return yaml.slice(start, end).trim();
+}
diff --git a/src/lib/ai/types.ts b/src/lib/ai/types.ts
new file mode 100644
index 00000000..b267abf3
--- /dev/null
+++ b/src/lib/ai/types.ts
@@ -0,0 +1,55 @@
+// src/lib/ai/types.ts
+
+export interface AiSuggestionBase {
+ id: string;
+ title: string;
+ description: string;
+ priority: "high" | "medium" | "low";
+}
+
+export interface ModifyConfigSuggestion {
+ type: "modify_config";
+ componentKey: string;
+ changes: Record;
+}
+
+export interface AddComponentSuggestion {
+ type: "add_component";
+ component: {
+ key: string;
+ componentType: string;
+ kind: "source" | "transform" | "sink";
+ config: Record;
+ };
+ insertAfter: string;
+ connectTo: string[];
+}
+
+export interface RemoveComponentSuggestion {
+ type: "remove_component";
+ componentKey: string;
+ reconnect: boolean;
+}
+
+export interface ModifyConnectionsSuggestion {
+ type: "modify_connections";
+ edgeChanges: Array<{
+ action: "add" | "remove";
+ from: string;
+ to: string;
+ }>;
+}
+
+export type AiSuggestion =
+ | (AiSuggestionBase & ModifyConfigSuggestion)
+ | (AiSuggestionBase & AddComponentSuggestion)
+ | (AiSuggestionBase & RemoveComponentSuggestion)
+ | (AiSuggestionBase & ModifyConnectionsSuggestion);
+
+export interface AiReviewResponse {
+ summary: string;
+ suggestions: AiSuggestion[];
+}
+
+/** State of a suggestion in the UI */
+export type SuggestionStatus = "actionable" | "applied" | "outdated" | "invalid";
diff --git a/src/server/routers/ai.ts b/src/server/routers/ai.ts
new file mode 100644
index 00000000..f7a4d9eb
--- /dev/null
+++ b/src/server/routers/ai.ts
@@ -0,0 +1,86 @@
+import { z } from "zod";
+import { TRPCError } from "@trpc/server";
+import { router, protectedProcedure, withTeamAccess } from "@/trpc/init";
+import { prisma } from "@/lib/prisma";
+import { withAudit } from "@/server/middleware/audit";
+import { Prisma } from "@/generated/prisma";
+
+export const aiRouter = router({
+ getConversation: protectedProcedure
+ .input(z.object({ pipelineId: z.string() }))
+ .use(withTeamAccess("VIEWER"))
+ .query(async ({ input }) => {
+ const conversation = await prisma.aiConversation.findFirst({
+ where: { pipelineId: input.pipelineId },
+ orderBy: { createdAt: "desc" },
+ include: {
+ messages: {
+ orderBy: { createdAt: "asc" },
+ include: {
+ createdBy: { select: { id: true, name: true, image: true } },
+ },
+ },
+ },
+ });
+ return conversation;
+ }),
+
+ startNewConversation: protectedProcedure
+ .input(z.object({ pipelineId: z.string() }))
+ .use(withTeamAccess("EDITOR"))
+ .use(withAudit("pipeline.ai_conversation_started", "Pipeline"))
+ .mutation(async ({ input, ctx }) => {
+ const conversation = await prisma.aiConversation.create({
+ data: {
+ pipelineId: input.pipelineId,
+ createdById: ctx.session.user.id,
+ },
+ });
+ return conversation;
+ }),
+
+ markSuggestionsApplied: protectedProcedure
+ .input(
+ z.object({
+ pipelineId: z.string(),
+ conversationId: z.string(),
+ messageId: z.string(),
+ suggestionIds: z.array(z.string()),
+ }),
+ )
+ .use(withTeamAccess("EDITOR"))
+ .use(withAudit("pipeline.ai_suggestion_applied", "Pipeline"))
+ .mutation(async ({ input, ctx }) => {
+ return prisma.$transaction(async (tx) => {
+ const message = await tx.aiMessage.findUnique({
+ where: { id: input.messageId },
+ include: {
+ conversation: { select: { pipelineId: true } },
+ },
+ });
+
+ if (
+ !message ||
+ message.conversationId !== input.conversationId ||
+ message.conversation.pipelineId !== input.pipelineId
+ ) {
+ throw new TRPCError({ code: "NOT_FOUND", message: "Message not found in conversation" });
+ }
+
+ // Mark suggestions as applied in the JSON
+ const suggestions = (message.suggestions as Array>) ?? [];
+ const updatedSuggestions = suggestions.map((s) =>
+ input.suggestionIds.includes(s.id as string)
+ ? { ...s, appliedAt: new Date().toISOString(), appliedById: ctx.session.user.id }
+ : s,
+ );
+
+ await tx.aiMessage.update({
+ where: { id: input.messageId },
+ data: { suggestions: updatedSuggestions as unknown as Prisma.InputJsonValue },
+ });
+
+ return { applied: input.suggestionIds.length };
+ });
+ }),
+});
diff --git a/src/server/services/ai.ts b/src/server/services/ai.ts
index b29aa182..0d81a16e 100644
--- a/src/server/services/ai.ts
+++ b/src/server/services/ai.ts
@@ -53,7 +53,7 @@ function validateBaseUrl(baseUrl: string): void {
interface StreamCompletionParams {
teamId: string;
systemPrompt: string;
- userPrompt: string;
+ messages: Array<{ role: "user" | "assistant"; content: string }>;
onToken: (token: string) => void;
signal?: AbortSignal;
}
@@ -102,7 +102,7 @@ export async function getTeamAiConfig(teamId: string, { requireEnabled = true }
export async function streamCompletion({
teamId,
systemPrompt,
- userPrompt,
+ messages,
onToken,
signal,
}: StreamCompletionParams): Promise {
@@ -126,8 +126,8 @@ export async function streamCompletion({
model: config.model,
stream: true,
messages: [
- { role: "system", content: systemPrompt },
- { role: "user", content: userPrompt },
+ { role: "system" as const, content: systemPrompt },
+ ...messages.map((m) => ({ role: m.role as "user" | "assistant", content: m.content })),
],
}),
signal,
diff --git a/src/stores/flow-store.ts b/src/stores/flow-store.ts
index 5de01e73..1386a351 100644
--- a/src/stores/flow-store.ts
+++ b/src/stores/flow-store.ts
@@ -1,6 +1,8 @@
import { create } from "zustand";
import { generateId } from "@/lib/utils";
import { generateComponentKey } from "@/lib/component-key";
+import { applySuggestion } from "@/lib/ai/suggestion-applier";
+import type { AiSuggestion } from "@/lib/ai/types";
import {
type Node,
type Edge,
@@ -112,6 +114,9 @@ export interface FlowState {
// Dirty tracking
markClean: () => void;
+ // AI suggestions
+ applySuggestions: (suggestions: AiSuggestion[]) => { applied: number; errors: string[] };
+
// Serialization
loadGraph: (nodes: Node[], edges: Edge[], globalConfig?: Record | null, options?: { isSystem?: boolean }) => void;
clearGraph: () => void;
@@ -790,6 +795,41 @@ export const useFlowStore = create()((set, get) => ({
set({ isDirty: false, _savedSnapshot: snapshot } as Partial);
},
+ /* ---- AI suggestions ---- */
+
+ applySuggestions: (suggestions) => {
+ const errors: string[] = [];
+ let applied = 0;
+
+ set((state) => {
+ let { nodes, edges } = state;
+
+ for (const suggestion of suggestions) {
+ const result = applySuggestion(suggestion, nodes, edges);
+ if (result.error) {
+ errors.push(result.error);
+ } else {
+ nodes = result.nodes;
+ edges = result.edges;
+ applied++;
+ }
+ }
+
+ // Only push an undo snapshot when something actually changed
+ if (applied === 0) return {};
+
+ const history = pushSnapshot(state);
+ return {
+ ...history,
+ nodes,
+ edges,
+ isDirty: true,
+ };
+ });
+
+ return { applied, errors };
+ },
+
/* ---- Serialization ---- */
loadGraph: (nodes, edges, globalConfig, options) => {
diff --git a/src/trpc/router.ts b/src/trpc/router.ts
index dc0871a1..3ff20276 100644
--- a/src/trpc/router.ts
+++ b/src/trpc/router.ts
@@ -20,6 +20,7 @@ import { alertRouter } from "@/server/routers/alert";
import { serviceAccountRouter } from "@/server/routers/service-account";
import { userPreferenceRouter } from "@/server/routers/user-preference";
import { sharedComponentRouter } from "@/server/routers/shared-component";
+import { aiRouter } from "@/server/routers/ai";
export const appRouter = router({
team: teamRouter,
@@ -43,6 +44,7 @@ export const appRouter = router({
serviceAccount: serviceAccountRouter,
userPreference: userPreferenceRouter,
sharedComponent: sharedComponentRouter,
+ ai: aiRouter,
});
export type AppRouter = typeof appRouter;