diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index cde2066d..55e800b4 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -84,7 +84,8 @@ func (a *Agent) Run() error { }() // Main loop: poll + heartbeat - ticker := time.NewTicker(a.cfg.PollInterval) + currentInterval := a.cfg.PollInterval + ticker := time.NewTicker(currentInterval) defer ticker.Stop() // Do first poll immediately @@ -107,6 +108,7 @@ func (a *Agent) Run() error { } a.sendHeartbeat() + currentInterval = a.maybeResetTicker(ticker, currentInterval) for { select { @@ -124,6 +126,7 @@ func (a *Agent) Run() error { case <-ticker.C: a.pollAndApply() a.sendHeartbeat() + currentInterval = a.maybeResetTicker(ticker, currentInterval) case msg := <-a.wsCh: a.handleWsMessage(msg, ticker) case <-a.immediateHeartbeatCh: @@ -132,6 +135,22 @@ func (a *Agent) Run() error { } } +// maybeResetTicker checks if the server provided a new poll interval and resets +// the ticker if it changed. Returns the (possibly updated) current interval. +func (a *Agent) maybeResetTicker(ticker *time.Ticker, current time.Duration) time.Duration { + serverMs := a.poller.PollIntervalMs() + if serverMs <= 0 { + return current + } + serverInterval := time.Duration(serverMs) * time.Millisecond + if serverInterval != current { + slog.Info("poll interval updated by server", "old", current, "new", serverInterval) + ticker.Reset(serverInterval) + return serverInterval + } + return current +} + func (a *Agent) pollAndApply() { actions, err := a.poller.Poll() if err != nil { diff --git a/agent/internal/agent/poller.go b/agent/internal/agent/poller.go index 5f9426ab..68cd9364 100644 --- a/agent/internal/agent/poller.go +++ b/agent/internal/agent/poller.go @@ -29,6 +29,7 @@ type poller struct { known map[string]pipelineState // pipelineId -> last known state sampleRequests []client.SampleRequestMsg pendingAction *client.PendingAction + pollIntervalMs int // server-provided poll interval from last response websocketUrl string } @@ -190,6 +191,9 @@ func (p *poller) Poll() ([]PipelineAction, error) { // Store pending action (e.g. self-update) for the agent to handle p.pendingAction = resp.PendingAction + // Store server-provided poll interval + p.pollIntervalMs = resp.PollIntervalMs + // Store websocket URL for the agent to use if resp.WebSocketURL != "" { p.websocketUrl = resp.WebSocketURL @@ -212,6 +216,11 @@ func (p *poller) PendingAction() *client.PendingAction { return p.pendingAction } +// PollIntervalMs returns the server-provided poll interval from the last response. +func (p *poller) PollIntervalMs() int { + return p.pollIntervalMs +} + // WebSocketURL returns the WebSocket URL from the last config response. func (p *poller) WebSocketURL() string { p.mu.Lock() diff --git a/docs/public/user-guide/ai-suggestions.md b/docs/public/user-guide/ai-suggestions.md index e04241c5..5c95ac22 100644 --- a/docs/public/user-guide/ai-suggestions.md +++ b/docs/public/user-guide/ai-suggestions.md @@ -50,7 +50,44 @@ Ask the AI to analyze your current pipeline configuration: > "Are there any performance issues with my pipeline?" -The AI reviews the generated YAML and provides suggestions for improvements, best practices, and potential issues. +The AI returns structured, actionable suggestion cards that you can selectively apply to your canvas. + +#### Suggestion cards + +Each suggestion appears as an interactive card showing: + +- **Title** and **description** explaining why the change helps +- **Priority badge** (High, Medium, Low) +- **Type badge** — Config Change, Add Component, Remove Component, or Rewire +- **Checkbox** for batch selection +- **Config preview** for configuration changes showing the exact fields that will be modified + +#### Applying suggestions + +- **Apply All** — applies every actionable suggestion from that AI response +- **Apply Selected** — applies only the suggestions you have checked + +Applied suggestions are marked with a green "Applied" badge and cannot be re-applied. The entire batch is a single undo operation — press **Ctrl+Z** (or **Cmd+Z**) to revert all changes at once. + +#### Conflict detection + +When you select multiple suggestions that conflict (e.g., two suggestions modifying the same config field, or one removing a component that another references), an amber warning appears on the affected cards explaining the conflict. You can still apply conflicting suggestions, but review the warnings first. + +#### Suggestion statuses + +| Status | Meaning | +|--------|---------| +| **Actionable** | Ready to apply | +| **Applied** | Already applied to the canvas | +| **Outdated** | The pipeline changed since this suggestion was made | +| **Invalid** | References a component that no longer exists on the canvas | + +#### Conversations + +Review conversations are persistent — they are saved per pipeline and visible to all team members with access. You can: + +- **Ask follow-up questions** using the input at the bottom of the dialog +- **Start a new conversation** by clicking "New Conversation" below the input ## Rate Limits diff --git a/prisma/migrations/20260310030000_add_ai_conversations/migration.sql b/prisma/migrations/20260310030000_add_ai_conversations/migration.sql new file mode 100644 index 00000000..77d5f757 --- /dev/null +++ b/prisma/migrations/20260310030000_add_ai_conversations/migration.sql @@ -0,0 +1,42 @@ +-- CreateTable +CREATE TABLE "AiConversation" ( + "id" TEXT NOT NULL, + "pipelineId" TEXT NOT NULL, + "createdById" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "AiConversation_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "AiMessage" ( + "id" TEXT NOT NULL, + "conversationId" TEXT NOT NULL, + "role" TEXT NOT NULL, + "content" TEXT NOT NULL, + "suggestions" JSONB, + "pipelineYaml" TEXT, + "createdById" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "AiMessage_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "AiConversation_pipelineId_createdAt_idx" ON "AiConversation"("pipelineId", "createdAt"); + +-- CreateIndex +CREATE INDEX "AiMessage_conversationId_createdAt_idx" ON "AiMessage"("conversationId", "createdAt"); + +-- AddForeignKey +ALTER TABLE "AiConversation" ADD CONSTRAINT "AiConversation_pipelineId_fkey" FOREIGN KEY ("pipelineId") REFERENCES "Pipeline"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "AiConversation" ADD CONSTRAINT "AiConversation_createdById_fkey" FOREIGN KEY ("createdById") REFERENCES "User"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "AiMessage" ADD CONSTRAINT "AiMessage_conversationId_fkey" FOREIGN KEY ("conversationId") REFERENCES "AiConversation"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "AiMessage" ADD CONSTRAINT "AiMessage_createdById_fkey" FOREIGN KEY ("createdById") REFERENCES "User"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index b529d5f9..3a244320 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -35,6 +35,8 @@ model User { deployRequestsReviewed DeployRequest[] @relation("deployReviewer") deployRequestsExecuted DeployRequest[] @relation("deployExecutor") preferences UserPreference[] + aiConversationsCreated AiConversation[] @relation("AiConversationCreatedBy") + aiMessagesCreated AiMessage[] @relation("AiMessageCreatedBy") createdAt DateTime @default(now()) } @@ -277,6 +279,7 @@ model Pipeline { slis PipelineSli[] enrichMetadata Boolean @default(false) tags Json? @default("[]") // string[] of classification tags like ["PII", "PCI-DSS"] + aiConversations AiConversation[] deployRequests DeployRequest[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -753,3 +756,31 @@ model ServiceAccount { @@index([hashedKey]) @@index([environmentId]) } + +model AiConversation { + id String @id @default(cuid()) + pipelineId String + pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade) + createdById String? + createdBy User? @relation("AiConversationCreatedBy", fields: [createdById], references: [id], onDelete: SetNull) + messages AiMessage[] + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([pipelineId, createdAt]) +} + +model AiMessage { + id String @id @default(cuid()) + conversationId String + conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) + role String // "user" | "assistant" + content String + suggestions Json? + pipelineYaml String? + createdById String? + createdBy User? @relation("AiMessageCreatedBy", fields: [createdById], references: [id], onDelete: SetNull) + createdAt DateTime @default(now()) + + @@index([conversationId, createdAt]) +} diff --git a/src/app/(dashboard)/pipelines/[id]/page.tsx b/src/app/(dashboard)/pipelines/[id]/page.tsx index 1bebf069..fe2d3dd3 100644 --- a/src/app/(dashboard)/pipelines/[id]/page.tsx +++ b/src/app/(dashboard)/pipelines/[id]/page.tsx @@ -540,6 +540,7 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) { )} diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 6c9f8c95..88338c6b 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -362,25 +362,46 @@ export async function POST(request: Request) { }); if (!request || request.status !== "PENDING") continue; - await prisma.eventSample.create({ - data: { - requestId: result.requestId, - pipelineId: request.pipelineId, - componentKey: result.componentKey ?? "", - events: (result.events ?? []) as Prisma.InputJsonValue, - schema: (result.schema ?? []) as Prisma.InputJsonValue, - error: result.error ?? null, - }, - }); + try { + await prisma.eventSample.create({ + data: { + requestId: result.requestId, + pipelineId: request.pipelineId, + componentKey: result.componentKey ?? "", + events: (result.events ?? []) as Prisma.InputJsonValue, + schema: (result.schema ?? []) as Prisma.InputJsonValue, + error: result.error ?? null, + }, + }); - await prisma.eventSampleRequest.update({ - where: { id: result.requestId }, - data: { - status: result.error ? "ERROR" : "COMPLETED", - completedAt: new Date(), - nodeId: agent.nodeId, - }, - }); + await prisma.eventSampleRequest.update({ + where: { id: result.requestId }, + data: { + status: result.error ? "ERROR" : "COMPLETED", + completedAt: new Date(), + nodeId: agent.nodeId, + }, + }); + } catch (err) { + // Only mark as ERROR if the EventSample write itself failed. + // If another agent already submitted a successful result, the + // request may already be COMPLETED — avoid overwriting that. + const current = await prisma.eventSampleRequest.findUnique({ + where: { id: result.requestId }, + select: { status: true }, + }); + if (current && current.status === "PENDING") { + await prisma.eventSampleRequest.update({ + where: { id: result.requestId }, + data: { + status: "ERROR", + completedAt: new Date(), + nodeId: agent.nodeId, + }, + }); + } + console.error("EventSample write error:", err); + } } } diff --git a/src/app/api/ai/pipeline/route.ts b/src/app/api/ai/pipeline/route.ts index bd4ed1da..574d45a8 100644 --- a/src/app/api/ai/pipeline/route.ts +++ b/src/app/api/ai/pipeline/route.ts @@ -4,6 +4,9 @@ import { auth } from "@/auth"; import { prisma } from "@/lib/prisma"; import { streamCompletion } from "@/server/services/ai"; import { buildPipelineSystemPrompt } from "@/lib/ai/prompts"; +import { writeAuditLog } from "@/server/services/audit"; +import type { AiReviewResponse } from "@/lib/ai/types"; +import { Prisma } from "@/generated/prisma"; export async function POST(request: Request) { const session = await auth(); @@ -20,6 +23,8 @@ export async function POST(request: Request) { mode: "generate" | "review"; currentYaml?: string; environmentName?: string; + pipelineId?: string; + conversationId?: string; }; try { @@ -67,6 +72,84 @@ export async function POST(request: Request) { }); } + // Validate pipelineId for review mode + if (body.mode === "review" && !body.pipelineId) { + return new Response(JSON.stringify({ error: "pipelineId is required for review mode" }), { + status: 400, + headers: { "Content-Type": "application/json" }, + }); + } + + // --- Conversation persistence (review mode only) --- + let conversationId = body.conversationId; + let priorMessages: Array<{ role: "user" | "assistant"; content: string }> = []; + + if (body.mode === "review" && body.pipelineId) { + // Verify pipelineId belongs to the team + const pipeline = await prisma.pipeline.findUnique({ + where: { id: body.pipelineId }, + select: { environment: { select: { teamId: true } } }, + }); + if (!pipeline || pipeline.environment.teamId !== body.teamId) { + return new Response(JSON.stringify({ error: "Pipeline not found" }), { + status: 404, + headers: { "Content-Type": "application/json" }, + }); + } + + if (!conversationId) { + const conversation = await prisma.aiConversation.create({ + data: { + pipelineId: body.pipelineId, + createdById: session.user.id, + }, + }); + conversationId = conversation.id; + } else { + // Verify conversationId belongs to this pipeline + const existing = await prisma.aiConversation.findUnique({ + where: { id: conversationId }, + select: { pipelineId: true }, + }); + if (!existing || existing.pipelineId !== body.pipelineId) { + return new Response(JSON.stringify({ error: "Conversation not found" }), { + status: 404, + headers: { "Content-Type": "application/json" }, + }); + } + } + + await prisma.aiMessage.create({ + data: { + conversationId, + role: "user", + content: body.prompt, + pipelineYaml: body.currentYaml ?? null, + createdById: session.user.id, + }, + }); + + // Get most recent 10 messages (desc) then reverse to chronological order + const history = await prisma.aiMessage.findMany({ + where: { conversationId }, + orderBy: { createdAt: "desc" }, + take: 10, + select: { role: true, content: true }, + }); + history.reverse(); + + // Exclude the message we just saved (last user msg) — it goes as the current prompt + priorMessages = history.slice(0, -1).map((m) => ({ + role: m.role as "user" | "assistant", + content: m.content, + })); + } + + const messages: Array<{ role: "user" | "assistant"; content: string }> = [ + ...priorMessages, + { role: "user", content: body.prompt }, + ]; + const mode = body.mode; const systemPrompt = buildPipelineSystemPrompt({ @@ -76,20 +159,75 @@ export async function POST(request: Request) { }); const encoder = new TextEncoder(); + let fullResponse = ""; const stream = new ReadableStream({ async start(controller) { try { + if (conversationId) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ conversationId })}\n\n`) + ); + } + await streamCompletion({ teamId: body.teamId, systemPrompt, - userPrompt: body.prompt, + messages, onToken: (token) => { + fullResponse += token; const data = JSON.stringify({ token }); controller.enqueue(encoder.encode(`data: ${data}\n\n`)); }, signal: request.signal, }); + if (body.mode === "review" && conversationId) { + let parsedSuggestions = null; + try { + const parsed: AiReviewResponse = JSON.parse(fullResponse); + if (parsed.summary && Array.isArray(parsed.suggestions)) { + parsedSuggestions = parsed.suggestions; + } + } catch { + // Not valid JSON — store as raw text + } + + try { + await prisma.aiMessage.create({ + data: { + conversationId, + role: "assistant", + content: fullResponse, + suggestions: (parsedSuggestions as unknown as Prisma.InputJsonValue) ?? undefined, + createdById: session.user.id, + }, + }); + } catch (err) { + console.error("Failed to persist AI response:", err); + } + + const pipelineForAudit = await prisma.pipeline.findUnique({ + where: { id: body.pipelineId! }, + select: { environmentId: true, environment: { select: { teamId: true } } }, + }); + + writeAuditLog({ + userId: session.user.id, + action: "pipeline.ai_review", + entityType: "Pipeline", + entityId: body.pipelineId!, + metadata: { + conversationId, + mode: body.mode, + suggestionCount: parsedSuggestions?.length ?? 0, + }, + teamId: pipelineForAudit?.environment.teamId ?? null, + environmentId: pipelineForAudit?.environmentId ?? null, + userEmail: session.user.email ?? null, + userName: session.user.name ?? null, + }).catch(() => {}); + } + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)); } catch (err) { const message = err instanceof Error ? err.message : "AI request failed"; diff --git a/src/app/api/ai/vrl/route.ts b/src/app/api/ai/vrl/route.ts index cc8b348c..cc1c452d 100644 --- a/src/app/api/ai/vrl/route.ts +++ b/src/app/api/ai/vrl/route.ts @@ -76,7 +76,7 @@ export async function POST(request: Request) { await streamCompletion({ teamId: body.teamId, systemPrompt, - userPrompt: body.prompt, + messages: [{ role: "user", content: body.prompt }], onToken: (token) => { const data = JSON.stringify({ token }); controller.enqueue(encoder.encode(`data: ${data}\n\n`)); diff --git a/src/components/flow/ai-message-bubble.tsx b/src/components/flow/ai-message-bubble.tsx new file mode 100644 index 00000000..d4113f33 --- /dev/null +++ b/src/components/flow/ai-message-bubble.tsx @@ -0,0 +1,157 @@ +"use client"; + +import { useState, useMemo } from "react"; +import { Bot, User } from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { AiSuggestionCard } from "./ai-suggestion-card"; +import { detectConflicts } from "@/lib/ai/conflict-detector"; +import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types"; +import type { ConversationMessage } from "@/hooks/use-ai-conversation"; + +interface AiMessageBubbleProps { + message: ConversationMessage; + suggestionStatuses: Map; + onApplySelected: (messageId: string, suggestions: AiSuggestion[]) => void; +} + +export function AiMessageBubble({ + message, + suggestionStatuses, + onApplySelected, +}: AiMessageBubbleProps) { + const [selectedIds, setSelectedIds] = useState>(new Set()); + + const suggestions = useMemo(() => message.suggestions ?? [], [message.suggestions]); + const hasSuggestions = message.role === "assistant" && suggestions.length > 0; + + // Parse summary from assistant JSON content + const summary = useMemo(() => { + if (message.role !== "assistant") return null; + if (!hasSuggestions) return null; + try { + const parsed = JSON.parse(message.content); + return parsed.summary as string | undefined; + } catch { + return null; + } + }, [message.content, message.role, hasSuggestions]); + + // Detect conflicts among selected suggestions + const conflicts = useMemo(() => { + const selected = suggestions.filter((s) => selectedIds.has(s.id)); + return detectConflicts(selected); + }, [suggestions, selectedIds]); + + const conflictMap = useMemo(() => { + const map = new Map(); + for (const c of conflicts) { + map.set(c.a, c.reason); + map.set(c.b, c.reason); + } + return map; + }, [conflicts]); + + const actionableSuggestions = suggestions.filter( + (s) => suggestionStatuses.get(s.id) === "actionable", + ); + + const selectedSuggestions = suggestions.filter((s) => selectedIds.has(s.id)); + + const handleToggle = (id: string) => { + setSelectedIds((prev) => { + const next = new Set(prev); + if (next.has(id)) { + next.delete(id); + } else { + next.add(id); + } + return next; + }); + }; + + const handleApplyAll = () => { + if (actionableSuggestions.length > 0) { + onApplySelected(message.id, actionableSuggestions); + } + }; + + const handleApplySelected = () => { + if (selectedSuggestions.length > 0) { + onApplySelected(message.id, selectedSuggestions); + } + }; + + if (message.role === "user") { + return ( +
+
+ +
+
+

{message.content}

+
+
+ ); + } + + // Assistant message + if (!hasSuggestions) { + // Fallback: render raw text content + return ( +
+
+ +
+
+
{message.content}
+
+
+ ); + } + + return ( +
+
+ +
+
+ {summary && ( +

{summary}

+ )} + +
+ {suggestions.map((s) => ( + + ))} +
+ + {actionableSuggestions.length > 0 && ( +
+ + +
+ )} +
+
+ ); +} diff --git a/src/components/flow/ai-pipeline-dialog.tsx b/src/components/flow/ai-pipeline-dialog.tsx index ad9a1214..31727159 100644 --- a/src/components/flow/ai-pipeline-dialog.tsx +++ b/src/components/flow/ai-pipeline-dialog.tsx @@ -1,8 +1,8 @@ // src/components/flow/ai-pipeline-dialog.tsx "use client"; -import { useState, useRef, useCallback } from "react"; -import { Loader2, RotateCcw, Sparkles, AlertTriangle } from "lucide-react"; +import { useState, useRef, useCallback, useMemo, useEffect } from "react"; +import { Loader2, RotateCcw, Sparkles, AlertTriangle, MessageSquarePlus } from "lucide-react"; import { Button } from "@/components/ui/button"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; @@ -14,48 +14,112 @@ import { DialogTitle, } from "@/components/ui/dialog"; import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +import { ScrollArea } from "@/components/ui/scroll-area"; import { useTeamStore } from "@/stores/team-store"; import { useFlowStore } from "@/stores/flow-store"; import { generateVectorYaml, importVectorConfig } from "@/lib/config-generator"; import { toast } from "sonner"; +import { useAiConversation } from "@/hooks/use-ai-conversation"; +import { AiMessageBubble } from "./ai-message-bubble"; +import { validateSuggestions } from "@/lib/ai/suggestion-validator"; +import { detectOutdatedSuggestions } from "@/lib/ai/suggestion-validator"; +import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types"; interface AiPipelineDialogProps { open: boolean; onOpenChange: (open: boolean) => void; + pipelineId: string; environmentName?: string; } export function AiPipelineDialog({ open, onOpenChange, + pipelineId, environmentName, }: AiPipelineDialogProps) { - const [prompt, setPrompt] = useState(""); - const [result, setResult] = useState(""); - const [isStreaming, setIsStreaming] = useState(false); - const [error, setError] = useState(null); const [mode, setMode] = useState<"generate" | "review">("generate"); - const abortRef = useRef(null); + + // --- Generate tab state (unchanged from original) --- + const [genPrompt, setGenPrompt] = useState(""); + const [genResult, setGenResult] = useState(""); + const [genIsStreaming, setGenIsStreaming] = useState(false); + const [genError, setGenError] = useState(null); + const genAbortRef = useRef(null); const selectedTeamId = useTeamStore((s) => s.selectedTeamId); const nodes = useFlowStore((s) => s.nodes); const edges = useFlowStore((s) => s.edges); const globalConfig = useFlowStore((s) => s.globalConfig); const loadGraph = useFlowStore((s) => s.loadGraph); + const applySuggestions = useFlowStore((s) => s.applySuggestions); const currentYaml = nodes.length > 0 ? generateVectorYaml(nodes, edges, globalConfig) : undefined; - const handleSubmit = useCallback( + // --- Review tab state --- + const [reviewPrompt, setReviewPrompt] = useState(""); + const conversation = useAiConversation({ + pipelineId, + currentYaml, + environmentName, + }); + const messagesEndRef = useRef(null); + + // Auto-scroll to bottom when messages change + useEffect(() => { + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }, [conversation.messages, conversation.streamingContent]); + + // Compute suggestion statuses across all messages + const suggestionStatuses = useMemo(() => { + const statuses = new Map(); + + for (const msg of conversation.messages) { + if (msg.role !== "assistant" || !msg.suggestions) continue; + + // Validate references against current canvas + const validation = validateSuggestions(msg.suggestions, nodes); + for (const [id, status] of validation) { + statuses.set(id, status); + } + + // Check for outdated suggestions + const outdated = detectOutdatedSuggestions( + msg.suggestions, + msg.pipelineYaml ?? null, + currentYaml ?? "", + ); + for (const id of outdated) { + if (statuses.get(id) === "actionable") { + statuses.set(id, "outdated"); + } + } + + // Check for already-applied suggestions (from server data) + for (const s of msg.suggestions) { + const raw = s as unknown as Record; + if (raw.appliedAt) { + statuses.set(s.id, "applied"); + } + } + } + + return statuses; + }, [conversation.messages, nodes, currentYaml]); + + // --- Generate tab handlers (identical to original) --- + + const handleGenSubmit = useCallback( async (e?: React.FormEvent) => { e?.preventDefault(); - if (!prompt.trim() || !selectedTeamId || isStreaming) return; + if (!genPrompt.trim() || !selectedTeamId || genIsStreaming) return; - setIsStreaming(true); - setResult(""); - setError(null); + setGenIsStreaming(true); + setGenResult(""); + setGenError(null); - abortRef.current = new AbortController(); + genAbortRef.current = new AbortController(); try { const response = await fetch("/api/ai/pipeline", { @@ -63,12 +127,12 @@ export function AiPipelineDialog({ headers: { "Content-Type": "application/json" }, body: JSON.stringify({ teamId: selectedTeamId, - prompt: prompt.trim(), - mode, - currentYaml: mode === "review" ? currentYaml : undefined, + prompt: genPrompt.trim(), + mode: "generate", + currentYaml: undefined, environmentName, }), - signal: abortRef.current.signal, + signal: genAbortRef.current.signal, }); if (!response.ok) { @@ -99,7 +163,7 @@ export function AiPipelineDialog({ if (data.done) break; if (data.error) throw new Error(data.error); if (data.token) { - setResult((prev) => prev + data.token); + setGenResult((prev) => prev + data.token); } } catch (parseErr) { if (parseErr instanceof Error && parseErr.message !== "Unexpected end of JSON input") { @@ -110,19 +174,18 @@ export function AiPipelineDialog({ } } catch (err) { if (err instanceof Error && err.name === "AbortError") return; - setError(err instanceof Error ? err.message : "AI request failed"); + setGenError(err instanceof Error ? err.message : "AI request failed"); } finally { - setIsStreaming(false); - abortRef.current = null; + setGenIsStreaming(false); + genAbortRef.current = null; } }, - [prompt, selectedTeamId, currentYaml, mode, environmentName, isStreaming], + [genPrompt, selectedTeamId, environmentName, genIsStreaming], ); const handleApplyToCanvas = () => { try { - // Strip any markdown fencing the LLM might have added - let yaml = result.trim(); + let yaml = genResult.trim(); if (yaml.startsWith("```yaml")) yaml = yaml.slice(7); if (yaml.startsWith("```")) yaml = yaml.slice(3); if (yaml.endsWith("```")) yaml = yaml.slice(0, -3); @@ -132,10 +195,8 @@ export function AiPipelineDialog({ importVectorConfig(yaml); if (nodes.length === 0) { - // Empty pipeline: replace loadGraph(newNodes, newEdges, importedGlobalConfig); } else { - // Existing pipeline: add alongside (offset positions to avoid overlap) const maxY = Math.max(...nodes.map((n) => n.position.y), 0); const offsetNodes = newNodes.map((n) => ({ ...n, @@ -149,8 +210,8 @@ export function AiPipelineDialog({ toast.success(`Applied ${newNodes.length} components to canvas`); onOpenChange(false); - setResult(""); - setPrompt(""); + setGenResult(""); + setGenPrompt(""); } catch (err) { toast.error("Failed to parse YAML", { description: err instanceof Error ? err.message : "Invalid YAML output", @@ -158,13 +219,46 @@ export function AiPipelineDialog({ } }; - const handleCancel = () => { - abortRef.current?.abort(); + const handleGenCancel = () => { + genAbortRef.current?.abort(); }; + // --- Review tab handlers --- + + const handleReviewSubmit = useCallback( + (e?: React.FormEvent) => { + e?.preventDefault(); + if (!reviewPrompt.trim()) return; + const prompt = reviewPrompt; + setReviewPrompt(""); + conversation.sendReview(prompt); + }, + [reviewPrompt, conversation], + ); + + const handleApplySelected = useCallback( + (messageId: string, suggestions: AiSuggestion[]) => { + const { applied, errors } = applySuggestions(suggestions); + + if (applied > 0) { + toast.success(`Applied ${applied} suggestion${applied > 1 ? "s" : ""} to canvas`); + conversation.markSuggestionsApplied( + messageId, + suggestions.map((s) => s.id), + ); + } + if (errors.length > 0) { + toast.error(`${errors.length} suggestion${errors.length > 1 ? "s" : ""} failed`, { + description: errors[0], + }); + } + }, + [applySuggestions, conversation], + ); + return ( - + @@ -175,7 +269,7 @@ export function AiPipelineDialog({ - setMode(v as "generate" | "review")}> + setMode(v as "generate" | "review")} className="flex flex-col flex-1 overflow-hidden"> Generate @@ -183,88 +277,154 @@ export function AiPipelineDialog({ - + {/* ---- Generate tab (unchanged) ---- */} +
-
+ setPrompt(e.target.value)} + value={genPrompt} + onChange={(e) => setGenPrompt(e.target.value)} placeholder="Collect K8s logs, drop debug, send to Datadog and S3" - disabled={isStreaming} + disabled={genIsStreaming} /> - {isStreaming ? ( - ) : ( - )}
-
- -
- -
- setPrompt(e.target.value)} - placeholder="Is my pipeline config optimal? Any issues?" - disabled={isStreaming} - /> - {isStreaming ? ( - - ) : ( - + {genError && ( +
+ + {genError} +
+ )} + + {(genResult || genIsStreaming) && ( +
+ +
+ {genResult || ( + + + Generating pipeline... + + )} +
+ {!genIsStreaming && genResult && ( +
+ + +
)} - -
+
+ )}
-
- {error && ( -
- - {error} -
- )} - - {(result || isStreaming) && ( -
- -
- {result || ( - - - {mode === "generate" ? "Generating pipeline..." : "Reviewing pipeline..."} - - )} -
- {!isStreaming && result && ( -
- {mode === "generate" && ( - - )} - + {/* ---- Review tab (conversation thread) ---- */} + + {conversation.isLoading ? ( +
+
+ ) : ( + <> + {/* Message thread */} + +
+ {conversation.messages.length === 0 && !conversation.isStreaming && ( +

+ Ask the AI to review your pipeline configuration. +

+ )} + + {conversation.messages.map((msg) => ( + + ))} + + {conversation.isStreaming && conversation.streamingContent && ( +
+
+ +
+
+
+ {conversation.streamingContent} +
+
+
+ )} + + {conversation.isStreaming && !conversation.streamingContent && ( +
+ + Analyzing pipeline... +
+ )} + +
+
+ + + {conversation.error && ( +
+ + {conversation.error} +
+ )} + + {/* Input pinned at bottom */} +
+
+ setReviewPrompt(e.target.value)} + placeholder="Ask about your pipeline..." + disabled={conversation.isStreaming} + /> + {conversation.isStreaming ? ( + + ) : ( + + )} +
+ {conversation.messages.length > 0 && ( + + )} +
+ )} -
- )} +
+
); diff --git a/src/components/flow/ai-suggestion-card.tsx b/src/components/flow/ai-suggestion-card.tsx new file mode 100644 index 00000000..10a9e008 --- /dev/null +++ b/src/components/flow/ai-suggestion-card.tsx @@ -0,0 +1,150 @@ +"use client"; + +import { Checkbox } from "@/components/ui/checkbox"; +import { Badge } from "@/components/ui/badge"; +import { cn } from "@/lib/utils"; +import type { AiSuggestion, SuggestionStatus } from "@/lib/ai/types"; +import { AlertTriangle } from "lucide-react"; + +interface AiSuggestionCardProps { + suggestion: AiSuggestion; + status: SuggestionStatus; + isSelected: boolean; + hasConflict: boolean; + conflictReason?: string; + onToggle: (id: string) => void; +} + +const TYPE_LABELS: Record = { + modify_config: "Config Change", + add_component: "Add Component", + remove_component: "Remove Component", + modify_connections: "Rewire", +}; + +const PRIORITY_COLORS: Record = { + high: "bg-red-500/15 text-red-700 dark:text-red-400", + medium: "bg-amber-500/15 text-amber-700 dark:text-amber-400", + low: "bg-green-500/15 text-green-700 dark:text-green-400", +}; + +const STATUS_BADGES: Partial> = { + applied: { label: "Applied", className: "bg-green-500/15 text-green-700 dark:text-green-400" }, + outdated: { label: "Outdated", className: "bg-amber-500/15 text-amber-700 dark:text-amber-400" }, + invalid: { label: "Invalid", className: "bg-red-500/15 text-red-700 dark:text-red-400" }, +}; + +export function AiSuggestionCard({ + suggestion, + status, + isSelected, + hasConflict, + conflictReason, + onToggle, +}: AiSuggestionCardProps) { + const isDisabled = status === "applied" || status === "invalid"; + const statusBadge = STATUS_BADGES[status]; + + return ( +
+
+ onToggle(suggestion.id)} + className="mt-0.5" + /> + +
+
+ + {suggestion.title} + + + {statusBadge && ( + + {statusBadge.label} + + )} + + + {suggestion.priority} + + + + {TYPE_LABELS[suggestion.type]} + +
+ +

+ {renderDescription(suggestion)} +

+ + {suggestion.type === "modify_config" && ( +
+ {Object.entries(suggestion.changes).map(([key, value]) => ( +
+ {key}:{" "} + {JSON.stringify(value)} +
+ ))} +
+ )} + + {hasConflict && conflictReason && ( +
+ + {conflictReason} +
+ )} +
+
+
+ ); +} + +function renderDescription(suggestion: AiSuggestion): React.ReactNode { + const desc = suggestion.description; + + // Highlight componentKey references in the description + const componentKeys: string[] = []; + if (suggestion.type === "modify_config" || suggestion.type === "remove_component") { + componentKeys.push(suggestion.componentKey); + } + if (suggestion.type === "add_component") { + componentKeys.push(suggestion.insertAfter, ...suggestion.connectTo); + } + if (suggestion.type === "modify_connections") { + for (const e of suggestion.edgeChanges) { + componentKeys.push(e.from, e.to); + } + } + + if (componentKeys.length === 0) return desc; + + const uniqueKeys = [...new Set(componentKeys)]; + const pattern = new RegExp(`(${uniqueKeys.map(k => k.replace(/[.*+?^${}()|[\]\\]/g, "\\$&")).join("|")})`, "g"); + const parts = desc.split(pattern); + + return parts.map((part, i) => + uniqueKeys.includes(part) ? ( + + {part} + + ) : ( + part + ), + ); +} diff --git a/src/components/ui/checkbox.tsx b/src/components/ui/checkbox.tsx new file mode 100644 index 00000000..f5a7e433 --- /dev/null +++ b/src/components/ui/checkbox.tsx @@ -0,0 +1,32 @@ +"use client" + +import * as React from "react" +import { CheckIcon } from "lucide-react" +import { Checkbox as CheckboxPrimitive } from "radix-ui" + +import { cn } from "@/lib/utils" + +function Checkbox({ + className, + ...props +}: React.ComponentProps) { + return ( + + + + + + ) +} + +export { Checkbox } diff --git a/src/hooks/use-ai-conversation.ts b/src/hooks/use-ai-conversation.ts new file mode 100644 index 00000000..3ad28a2c --- /dev/null +++ b/src/hooks/use-ai-conversation.ts @@ -0,0 +1,238 @@ +"use client"; + +import { useState, useRef, useCallback } from "react"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { useTRPC } from "@/trpc/client"; +import { useTeamStore } from "@/stores/team-store"; +import type { AiSuggestion } from "@/lib/ai/types"; +import { parseAiReviewResponse } from "@/lib/ai/suggestion-validator"; + +export interface ConversationMessage { + id: string; + role: "user" | "assistant"; + content: string; + suggestions?: AiSuggestion[]; + pipelineYaml?: string | null; + createdAt: string; + createdBy?: { id: string; name: string | null; image: string | null } | null; +} + +interface UseAiConversationOptions { + pipelineId: string; + currentYaml?: string; + environmentName?: string; +} + +export function useAiConversation({ + pipelineId, + currentYaml, + environmentName, +}: UseAiConversationOptions) { + const trpc = useTRPC(); + const queryClient = useQueryClient(); + const selectedTeamId = useTeamStore((s) => s.selectedTeamId); + + const [messages, setMessages] = useState([]); + const [conversationId, setConversationId] = useState(null); + const [isStreaming, setIsStreaming] = useState(false); + const [streamingContent, setStreamingContent] = useState(""); + const [error, setError] = useState(null); + const abortRef = useRef(null); + const isNewConversationRef = useRef(false); + + // Load existing conversation + const conversationQuery = useQuery({ + ...trpc.ai.getConversation.queryOptions({ pipelineId }), + enabled: !!pipelineId, + }); + + // Sync loaded conversation into local state + const loadedConversation = conversationQuery.data; + if (loadedConversation && !conversationId && messages.length === 0 && !isStreaming && !isNewConversationRef.current) { + setConversationId(loadedConversation.id); + setMessages( + loadedConversation.messages.map((m) => ({ + id: m.id, + role: m.role as "user" | "assistant", + content: m.content, + suggestions: m.suggestions as unknown as AiSuggestion[] | undefined, + pipelineYaml: m.pipelineYaml, + createdAt: m.createdAt instanceof Date ? m.createdAt.toISOString() : String(m.createdAt), + createdBy: m.createdBy, + })), + ); + } + + const markAppliedMutation = useMutation( + trpc.ai.markSuggestionsApplied.mutationOptions({ + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) }); + }, + }), + ); + + const sendReview = useCallback( + async (prompt: string) => { + if (!prompt.trim() || !selectedTeamId || isStreaming) return; + + isNewConversationRef.current = false; + setIsStreaming(true); + setStreamingContent(""); + setError(null); + + // Add optimistic user message + const userMessage: ConversationMessage = { + id: `temp-user-${Date.now()}`, + role: "user", + content: prompt.trim(), + pipelineYaml: currentYaml ?? null, + createdAt: new Date().toISOString(), + }; + setMessages((prev) => [...prev, userMessage]); + + abortRef.current = new AbortController(); + let fullResponse = ""; + + try { + const response = await fetch("/api/ai/pipeline", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + teamId: selectedTeamId, + prompt: prompt.trim(), + mode: "review", + currentYaml, + environmentName, + pipelineId, + conversationId, + }), + signal: abortRef.current.signal, + }); + + if (!response.ok) { + const errData = await response.json().catch(() => ({ error: "Request failed" })); + throw new Error(errData.error || `HTTP ${response.status}`); + } + + const reader = response.body?.getReader(); + if (!reader) throw new Error("No response stream"); + + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || !trimmed.startsWith("data: ")) continue; + + try { + const data = JSON.parse(trimmed.slice(6)); + if (data.conversationId) { + setConversationId(data.conversationId); + continue; + } + if (data.done) break; + if (data.error) throw new Error(data.error); + if (data.token) { + fullResponse += data.token; + setStreamingContent(fullResponse); + } + } catch (parseErr) { + if (parseErr instanceof Error && parseErr.message !== "Unexpected end of JSON input") { + throw parseErr; + } + } + } + } + + // Parse the completed response + const parsed = parseAiReviewResponse(fullResponse); + + const assistantMessage: ConversationMessage = { + id: `temp-assistant-${Date.now()}`, + role: "assistant", + content: fullResponse, + suggestions: parsed?.suggestions, + pipelineYaml: currentYaml ?? null, + createdAt: new Date().toISOString(), + }; + setMessages((prev) => [...prev, assistantMessage]); + setStreamingContent(""); + + // Refetch to sync local state with server-persisted messages (real IDs) + const refetched = await queryClient.fetchQuery({ + ...trpc.ai.getConversation.queryOptions({ pipelineId }), + staleTime: 0, + }); + if (refetched?.messages) { + setMessages( + refetched.messages.map((m) => ({ + id: m.id, + role: m.role as "user" | "assistant", + content: m.content, + suggestions: m.suggestions as unknown as AiSuggestion[] | undefined, + pipelineYaml: m.pipelineYaml, + createdAt: m.createdAt instanceof Date ? m.createdAt.toISOString() : String(m.createdAt), + createdBy: m.createdBy, + })), + ); + } + } catch (err) { + if (err instanceof Error && err.name === "AbortError") return; + setError(err instanceof Error ? err.message : "AI request failed"); + } finally { + setIsStreaming(false); + abortRef.current = null; + } + }, + [selectedTeamId, isStreaming, currentYaml, environmentName, pipelineId, conversationId, queryClient, trpc], + ); + + const startNewConversation = useCallback(() => { + isNewConversationRef.current = true; + queryClient.removeQueries({ queryKey: trpc.ai.getConversation.queryKey({ pipelineId }) }); + setMessages([]); + setConversationId(null); + setStreamingContent(""); + setError(null); + }, [queryClient, trpc, pipelineId]); + + const markSuggestionsApplied = useCallback( + (messageId: string, suggestionIds: string[]) => { + if (!conversationId) return; + if (messageId.startsWith("temp-")) return; // Wait for server-persisted IDs + + markAppliedMutation.mutate({ + pipelineId, + conversationId, + messageId, + suggestionIds, + }); + }, + [conversationId, pipelineId, markAppliedMutation], + ); + + const cancelStreaming = useCallback(() => { + abortRef.current?.abort(); + }, []); + + return { + messages, + conversationId, + isStreaming, + streamingContent, + error, + isLoading: conversationQuery.isLoading, + sendReview, + startNewConversation, + markSuggestionsApplied, + cancelStreaming, + }; +} diff --git a/src/lib/ai/conflict-detector.ts b/src/lib/ai/conflict-detector.ts new file mode 100644 index 00000000..ab2611f3 --- /dev/null +++ b/src/lib/ai/conflict-detector.ts @@ -0,0 +1,107 @@ +// src/lib/ai/conflict-detector.ts +import type { AiSuggestion } from "./types"; + +export interface ConflictPair { + a: string; + b: string; + reason: string; +} + +/** + * Detect conflicts between selected suggestions. + * Returns pairs of conflicting suggestion IDs with reasons. + */ +export function detectConflicts(suggestions: AiSuggestion[]): ConflictPair[] { + const conflicts: ConflictPair[] = []; + + for (let i = 0; i < suggestions.length; i++) { + for (let j = i + 1; j < suggestions.length; j++) { + const conflict = checkPairConflict(suggestions[i], suggestions[j]); + if (conflict) { + conflicts.push({ a: suggestions[i].id, b: suggestions[j].id, reason: conflict }); + } + } + } + + return conflicts; +} + +function checkPairConflict(a: AiSuggestion, b: AiSuggestion): string | null { + // Same-type: two modify_config on same component with overlapping keys + if (a.type === "modify_config" && b.type === "modify_config") { + if (a.componentKey === b.componentKey) { + const keysA = Object.keys(a.changes); + const keysB = Object.keys(b.changes); + const overlap = keysA.filter((k) => keysB.includes(k)); + if (overlap.length > 0) { + return `Both modify "${a.componentKey}" config keys: ${overlap.join(", ")}`; + } + } + } + + // Same-type: contradicting modify_connections + if (a.type === "modify_connections" && b.type === "modify_connections") { + for (const ea of a.edgeChanges) { + for (const eb of b.edgeChanges) { + if (ea.from === eb.from && ea.to === eb.to) { + if (ea.action !== eb.action) { + return `Contradicting edge changes: ${ea.from} to ${ea.to}`; + } + if (ea.action === "add" && eb.action === "add") { + return `Duplicate edge addition: ${ea.from} to ${ea.to}`; + } + } + } + } + } + + // Cross-type: remove_component vs anything referencing that component + if (a.type === "remove_component" || b.type === "remove_component") { + const remover = (a.type === "remove_component" ? a : b) as AiSuggestion & { type: "remove_component" }; + const other = a.type === "remove_component" ? b : a; + const removedKey = remover.componentKey; + const referencedKeys = getReferencedComponentKeys(other); + if (referencedKeys.has(removedKey)) { + return `"${removedKey}" is removed by one suggestion but referenced by another`; + } + } + + // Cross-type: add_component connectTo vs modify_connections removing same edge + if ( + (a.type === "add_component" && b.type === "modify_connections") || + (a.type === "modify_connections" && b.type === "add_component") + ) { + const adder = a.type === "add_component" ? a : b as AiSuggestion & { type: "add_component" }; + const modifier = a.type === "modify_connections" ? a : b as AiSuggestion & { type: "modify_connections" }; + for (const edge of modifier.edgeChanges) { + if (edge.action === "remove" && adder.connectTo.includes(edge.to)) { + return `add_component connects to "${edge.to}" but modify_connections removes an edge to it`; + } + } + } + + return null; +} + +function getReferencedComponentKeys(s: AiSuggestion): Set { + const keys = new Set(); + switch (s.type) { + case "modify_config": + keys.add(s.componentKey); + break; + case "add_component": + keys.add(s.insertAfter); + for (const k of s.connectTo) keys.add(k); + break; + case "remove_component": + keys.add(s.componentKey); + break; + case "modify_connections": + for (const e of s.edgeChanges) { + keys.add(e.from); + keys.add(e.to); + } + break; + } + return keys; +} diff --git a/src/lib/ai/prompts.ts b/src/lib/ai/prompts.ts index bfe5cb92..66524141 100644 --- a/src/lib/ai/prompts.ts +++ b/src/lib/ai/prompts.ts @@ -61,10 +61,37 @@ export function buildPipelineSystemPrompt(context: { } else { parts.push( "You are a Vector pipeline configuration reviewer.", - "Analyze the provided Vector pipeline YAML and provide improvement suggestions.", - "Focus on: performance, correctness, best practices, and potential issues.", - "If the user asks for a revised config, output the complete corrected YAML with no markdown fencing.", - "Otherwise, provide suggestions as concise text.", + "Analyze the provided Vector pipeline YAML and return your response as a JSON object.", + "", + "Response format (return ONLY this JSON, no markdown fencing, no extra text):", + JSON.stringify({ + summary: "2-3 sentence analysis of the pipeline", + suggestions: [ + { + id: "s1", + type: "modify_config", + title: "Short title", + description: "Why this helps", + priority: "high|medium|low", + componentKey: "existing_component_key", + changes: { "config.field": "new_value" }, + }, + ], + }, null, 2), + "", + "Suggestion types:", + '- modify_config: { type: "modify_config", componentKey, changes: { field: value } }', + '- add_component: { type: "add_component", component: { key, componentType, kind: "source"|"transform"|"sink", config }, insertAfter: "existing_key", connectTo: ["downstream_key"] }', + '- remove_component: { type: "remove_component", componentKey, reconnect: true|false }', + '- modify_connections: { type: "modify_connections", edgeChanges: [{ action: "add"|"remove", from: "key", to: "key" }] }', + "", + "Rules:", + "- Each suggestion needs a unique id (s1, s2, s3...)", + "- componentKey values MUST match real keys from the provided YAML", + "- Focus on: performance, correctness, best practices, potential issues", + "- Prioritize: high = likely bug or major perf issue, medium = optimization, low = cleanup", + "- Return valid JSON only. No markdown, no code fences, no commentary outside the JSON.", + "- Even in follow-up messages, always return the full JSON object. Never mix prose with JSON.", ); } diff --git a/src/lib/ai/suggestion-applier.ts b/src/lib/ai/suggestion-applier.ts new file mode 100644 index 00000000..efd82a0f --- /dev/null +++ b/src/lib/ai/suggestion-applier.ts @@ -0,0 +1,203 @@ +// src/lib/ai/suggestion-applier.ts +import type { Node, Edge } from "@xyflow/react"; +import type { AiSuggestion } from "./types"; +import { findComponentDef } from "@/lib/vector/catalog"; +import { generateComponentKey } from "@/lib/component-key"; +import { generateId } from "@/lib/utils"; + +interface ApplyResult { + nodes: Node[]; + edges: Edge[]; + error?: string; +} + +/** + * Apply a single suggestion to the current flow state. + * Returns new nodes/edges arrays (immutable). + */ +export function applySuggestion( + suggestion: AiSuggestion, + nodes: Node[], + edges: Edge[], +): ApplyResult { + switch (suggestion.type) { + case "modify_config": + return applyModifyConfig(suggestion, nodes, edges); + case "add_component": + return applyAddComponent(suggestion, nodes, edges); + case "remove_component": + return applyRemoveComponent(suggestion, nodes, edges); + case "modify_connections": + return applyModifyConnections(suggestion, nodes, edges); + default: + return { nodes, edges, error: "Unknown suggestion type" }; + } +} + +function findNodeByComponentKey(nodes: Node[], componentKey: string): Node | undefined { + return nodes.find((n) => (n.data as Record).componentKey === componentKey); +} + +/** Deep-set a value at a dot-notation path, returning a shallow-cloned object tree. */ +function setAtPath(obj: Record, path: string, value: unknown): Record { + if (!path.includes(".")) { + return { ...obj, [path]: value }; + } + const [head, ...rest] = path.split("."); + const child = (obj[head] ?? {}) as Record; + return { ...obj, [head]: setAtPath(child, rest.join("."), value) }; +} + +function applyModifyConfig( + suggestion: AiSuggestion & { type: "modify_config" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + const target = findNodeByComponentKey(nodes, suggestion.componentKey); + if (!target) { + return { nodes, edges, error: `Component "${suggestion.componentKey}" not found` }; + } + + const existingConfig = (target.data as Record).config as Record; + let newConfig = { ...existingConfig }; + for (const [key, value] of Object.entries(suggestion.changes)) { + newConfig = setAtPath(newConfig, key, value); + } + + const newNodes = nodes.map((n) => + n.id === target.id + ? { ...n, data: { ...n.data, config: newConfig } } + : n, + ); + + return { nodes: newNodes, edges }; +} + +function applyAddComponent( + suggestion: AiSuggestion & { type: "add_component" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + const { component, insertAfter, connectTo } = suggestion; + + const componentDef = findComponentDef(component.componentType, component.kind); + if (!componentDef) { + return { nodes, edges, error: `Unknown component type "${component.componentType}"` }; + } + + const afterNode = findNodeByComponentKey(nodes, insertAfter); + if (!afterNode) { + return { nodes, edges, error: `Component "${insertAfter}" not found for insertAfter` }; + } + + const position = { + x: afterNode.position.x, + y: afterNode.position.y + 150, + }; + + const newNodeId = generateId(); + const newComponentKey = generateComponentKey(component.componentType); + + const newNode: Node = { + id: newNodeId, + type: component.kind, + position, + data: { + componentDef, + componentKey: newComponentKey, + displayName: componentDef.displayName, + config: component.config, + }, + }; + + let newEdges = [...edges]; + + // Add edge: afterNode to newNode (once, regardless of connectTo count) + newEdges.push({ id: generateId(), source: afterNode.id, target: newNodeId }); + + for (const downstreamKey of connectTo) { + const downstreamNode = findNodeByComponentKey(nodes, downstreamKey); + if (!downstreamNode) continue; + + // Remove existing edge from afterNode to downstream + newEdges = newEdges.filter( + (e) => !(e.source === afterNode.id && e.target === downstreamNode.id), + ); + + // Add edge: newNode to downstream + newEdges.push({ id: generateId(), source: newNodeId, target: downstreamNode.id }); + } + + return { nodes: [...nodes, newNode], edges: newEdges }; +} + +function applyRemoveComponent( + suggestion: AiSuggestion & { type: "remove_component" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + const target = findNodeByComponentKey(nodes, suggestion.componentKey); + if (!target) { + return { nodes, edges, error: `Component "${suggestion.componentKey}" not found` }; + } + + if ((target.data as Record).isSystemLocked) { + return { nodes, edges, error: `Component "${suggestion.componentKey}" is system-locked` }; + } + + const incomingEdges = edges.filter((e) => e.target === target.id); + const outgoingEdges = edges.filter((e) => e.source === target.id); + + const newEdges = edges.filter((e) => e.source !== target.id && e.target !== target.id); + const newNodes = nodes.filter((n) => n.id !== target.id); + + if (suggestion.reconnect) { + for (const incoming of incomingEdges) { + for (const outgoing of outgoingEdges) { + newEdges.push({ + id: generateId(), + source: incoming.source, + target: outgoing.target, + }); + } + } + } + + return { nodes: newNodes, edges: newEdges }; +} + +function applyModifyConnections( + suggestion: AiSuggestion & { type: "modify_connections" }, + nodes: Node[], + edges: Edge[], +): ApplyResult { + let newEdges = [...edges]; + + for (const change of suggestion.edgeChanges) { + const fromNode = findNodeByComponentKey(nodes, change.from); + const toNode = findNodeByComponentKey(nodes, change.to); + + if (!fromNode || !toNode) { + return { + nodes, + edges, + error: `Component "${!fromNode ? change.from : change.to}" not found`, + }; + } + + if (change.action === "add") { + const exists = newEdges.some( + (e) => e.source === fromNode.id && e.target === toNode.id, + ); + if (!exists) { + newEdges.push({ id: generateId(), source: fromNode.id, target: toNode.id }); + } + } else { + newEdges = newEdges.filter( + (e) => !(e.source === fromNode.id && e.target === toNode.id), + ); + } + } + + return { nodes, edges: newEdges }; +} diff --git a/src/lib/ai/suggestion-validator.ts b/src/lib/ai/suggestion-validator.ts new file mode 100644 index 00000000..2fc05979 --- /dev/null +++ b/src/lib/ai/suggestion-validator.ts @@ -0,0 +1,101 @@ +// src/lib/ai/suggestion-validator.ts +import type { Node } from "@xyflow/react"; +import type { AiSuggestion, AiReviewResponse, SuggestionStatus } from "./types"; + +/** + * Validate a parsed AI response. Returns the response if valid, null if not. + */ +export function parseAiReviewResponse(raw: string): AiReviewResponse | null { + try { + const parsed = JSON.parse(raw); + if ( + typeof parsed === "object" && + parsed !== null && + typeof parsed.summary === "string" && + Array.isArray(parsed.suggestions) + ) { + return parsed as AiReviewResponse; + } + return null; + } catch { + return null; + } +} + +/** + * Validate that suggestion references exist on the canvas. + */ +export function validateSuggestions( + suggestions: AiSuggestion[], + nodes: Node[], +): Map { + const componentKeys = new Set( + nodes.map((n) => (n.data as Record).componentKey as string), + ); + + const statuses = new Map(); + + for (const s of suggestions) { + const referencedKeys = getReferencedKeys(s); + const allValid = referencedKeys.every((k) => componentKeys.has(k)); + statuses.set(s.id, allValid ? "actionable" : "invalid"); + } + + return statuses; +} + +/** + * Determine which suggestions are outdated by comparing pipeline YAML snapshots. + */ +export function detectOutdatedSuggestions( + suggestions: AiSuggestion[], + snapshotYaml: string | null, + currentYaml: string, +): Set { + if (!snapshotYaml || snapshotYaml === currentYaml) { + return new Set(); + } + + const outdated = new Set(); + + for (const s of suggestions) { + const keys = getReferencedKeys(s); + for (const key of keys) { + const snapshotBlock = extractComponentBlock(snapshotYaml, key); + const currentBlock = extractComponentBlock(currentYaml, key); + if (snapshotBlock !== currentBlock) { + outdated.add(s.id); + break; + } + } + } + + return outdated; +} + +function getReferencedKeys(s: AiSuggestion): string[] { + switch (s.type) { + case "modify_config": + return [s.componentKey]; + case "add_component": + return [s.insertAfter, ...s.connectTo]; + case "remove_component": + return [s.componentKey]; + case "modify_connections": + return s.edgeChanges.flatMap((e) => [e.from, e.to]); + } +} + +function extractComponentBlock(yaml: string, componentKey: string): string | null { + const escaped = componentKey.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const regex = new RegExp(`^ ${escaped}:\\s*$`, "m"); + const match = regex.exec(yaml); + if (!match) return null; + + const start = match.index; + const rest = yaml.slice(start + match[0].length); + const nextKey = rest.search(/^\s{2}\S/m); + const end = nextKey === -1 ? yaml.length : start + match[0].length + nextKey; + + return yaml.slice(start, end).trim(); +} diff --git a/src/lib/ai/types.ts b/src/lib/ai/types.ts new file mode 100644 index 00000000..b267abf3 --- /dev/null +++ b/src/lib/ai/types.ts @@ -0,0 +1,55 @@ +// src/lib/ai/types.ts + +export interface AiSuggestionBase { + id: string; + title: string; + description: string; + priority: "high" | "medium" | "low"; +} + +export interface ModifyConfigSuggestion { + type: "modify_config"; + componentKey: string; + changes: Record; +} + +export interface AddComponentSuggestion { + type: "add_component"; + component: { + key: string; + componentType: string; + kind: "source" | "transform" | "sink"; + config: Record; + }; + insertAfter: string; + connectTo: string[]; +} + +export interface RemoveComponentSuggestion { + type: "remove_component"; + componentKey: string; + reconnect: boolean; +} + +export interface ModifyConnectionsSuggestion { + type: "modify_connections"; + edgeChanges: Array<{ + action: "add" | "remove"; + from: string; + to: string; + }>; +} + +export type AiSuggestion = + | (AiSuggestionBase & ModifyConfigSuggestion) + | (AiSuggestionBase & AddComponentSuggestion) + | (AiSuggestionBase & RemoveComponentSuggestion) + | (AiSuggestionBase & ModifyConnectionsSuggestion); + +export interface AiReviewResponse { + summary: string; + suggestions: AiSuggestion[]; +} + +/** State of a suggestion in the UI */ +export type SuggestionStatus = "actionable" | "applied" | "outdated" | "invalid"; diff --git a/src/server/routers/ai.ts b/src/server/routers/ai.ts new file mode 100644 index 00000000..f7a4d9eb --- /dev/null +++ b/src/server/routers/ai.ts @@ -0,0 +1,86 @@ +import { z } from "zod"; +import { TRPCError } from "@trpc/server"; +import { router, protectedProcedure, withTeamAccess } from "@/trpc/init"; +import { prisma } from "@/lib/prisma"; +import { withAudit } from "@/server/middleware/audit"; +import { Prisma } from "@/generated/prisma"; + +export const aiRouter = router({ + getConversation: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + const conversation = await prisma.aiConversation.findFirst({ + where: { pipelineId: input.pipelineId }, + orderBy: { createdAt: "desc" }, + include: { + messages: { + orderBy: { createdAt: "asc" }, + include: { + createdBy: { select: { id: true, name: true, image: true } }, + }, + }, + }, + }); + return conversation; + }), + + startNewConversation: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.ai_conversation_started", "Pipeline")) + .mutation(async ({ input, ctx }) => { + const conversation = await prisma.aiConversation.create({ + data: { + pipelineId: input.pipelineId, + createdById: ctx.session.user.id, + }, + }); + return conversation; + }), + + markSuggestionsApplied: protectedProcedure + .input( + z.object({ + pipelineId: z.string(), + conversationId: z.string(), + messageId: z.string(), + suggestionIds: z.array(z.string()), + }), + ) + .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.ai_suggestion_applied", "Pipeline")) + .mutation(async ({ input, ctx }) => { + return prisma.$transaction(async (tx) => { + const message = await tx.aiMessage.findUnique({ + where: { id: input.messageId }, + include: { + conversation: { select: { pipelineId: true } }, + }, + }); + + if ( + !message || + message.conversationId !== input.conversationId || + message.conversation.pipelineId !== input.pipelineId + ) { + throw new TRPCError({ code: "NOT_FOUND", message: "Message not found in conversation" }); + } + + // Mark suggestions as applied in the JSON + const suggestions = (message.suggestions as Array>) ?? []; + const updatedSuggestions = suggestions.map((s) => + input.suggestionIds.includes(s.id as string) + ? { ...s, appliedAt: new Date().toISOString(), appliedById: ctx.session.user.id } + : s, + ); + + await tx.aiMessage.update({ + where: { id: input.messageId }, + data: { suggestions: updatedSuggestions as unknown as Prisma.InputJsonValue }, + }); + + return { applied: input.suggestionIds.length }; + }); + }), +}); diff --git a/src/server/services/ai.ts b/src/server/services/ai.ts index b29aa182..0d81a16e 100644 --- a/src/server/services/ai.ts +++ b/src/server/services/ai.ts @@ -53,7 +53,7 @@ function validateBaseUrl(baseUrl: string): void { interface StreamCompletionParams { teamId: string; systemPrompt: string; - userPrompt: string; + messages: Array<{ role: "user" | "assistant"; content: string }>; onToken: (token: string) => void; signal?: AbortSignal; } @@ -102,7 +102,7 @@ export async function getTeamAiConfig(teamId: string, { requireEnabled = true } export async function streamCompletion({ teamId, systemPrompt, - userPrompt, + messages, onToken, signal, }: StreamCompletionParams): Promise { @@ -126,8 +126,8 @@ export async function streamCompletion({ model: config.model, stream: true, messages: [ - { role: "system", content: systemPrompt }, - { role: "user", content: userPrompt }, + { role: "system" as const, content: systemPrompt }, + ...messages.map((m) => ({ role: m.role as "user" | "assistant", content: m.content })), ], }), signal, diff --git a/src/stores/flow-store.ts b/src/stores/flow-store.ts index 5de01e73..1386a351 100644 --- a/src/stores/flow-store.ts +++ b/src/stores/flow-store.ts @@ -1,6 +1,8 @@ import { create } from "zustand"; import { generateId } from "@/lib/utils"; import { generateComponentKey } from "@/lib/component-key"; +import { applySuggestion } from "@/lib/ai/suggestion-applier"; +import type { AiSuggestion } from "@/lib/ai/types"; import { type Node, type Edge, @@ -112,6 +114,9 @@ export interface FlowState { // Dirty tracking markClean: () => void; + // AI suggestions + applySuggestions: (suggestions: AiSuggestion[]) => { applied: number; errors: string[] }; + // Serialization loadGraph: (nodes: Node[], edges: Edge[], globalConfig?: Record | null, options?: { isSystem?: boolean }) => void; clearGraph: () => void; @@ -790,6 +795,41 @@ export const useFlowStore = create()((set, get) => ({ set({ isDirty: false, _savedSnapshot: snapshot } as Partial); }, + /* ---- AI suggestions ---- */ + + applySuggestions: (suggestions) => { + const errors: string[] = []; + let applied = 0; + + set((state) => { + let { nodes, edges } = state; + + for (const suggestion of suggestions) { + const result = applySuggestion(suggestion, nodes, edges); + if (result.error) { + errors.push(result.error); + } else { + nodes = result.nodes; + edges = result.edges; + applied++; + } + } + + // Only push an undo snapshot when something actually changed + if (applied === 0) return {}; + + const history = pushSnapshot(state); + return { + ...history, + nodes, + edges, + isDirty: true, + }; + }); + + return { applied, errors }; + }, + /* ---- Serialization ---- */ loadGraph: (nodes, edges, globalConfig, options) => { diff --git a/src/trpc/router.ts b/src/trpc/router.ts index dc0871a1..3ff20276 100644 --- a/src/trpc/router.ts +++ b/src/trpc/router.ts @@ -20,6 +20,7 @@ import { alertRouter } from "@/server/routers/alert"; import { serviceAccountRouter } from "@/server/routers/service-account"; import { userPreferenceRouter } from "@/server/routers/user-preference"; import { sharedComponentRouter } from "@/server/routers/shared-component"; +import { aiRouter } from "@/server/routers/ai"; export const appRouter = router({ team: teamRouter, @@ -43,6 +44,7 @@ export const appRouter = router({ serviceAccount: serviceAccountRouter, userPreference: userPreferenceRouter, sharedComponent: sharedComponentRouter, + ai: aiRouter, }); export type AppRouter = typeof appRouter;