Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- AlterTable
ALTER TABLE "AiConversation" ADD COLUMN "componentKey" TEXT;

-- AlterTable
ALTER TABLE "AiMessage" ADD COLUMN "vrlCode" TEXT;

-- CreateIndex
CREATE INDEX "AiConversation_pipelineId_componentKey_idx" ON "AiConversation"("pipelineId", "componentKey");
19 changes: 11 additions & 8 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -775,16 +775,18 @@ model ServiceAccount {
}

model AiConversation {
id String @id @default(cuid())
pipelineId String
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
createdById String?
createdBy User? @relation("AiConversationCreatedBy", fields: [createdById], references: [id], onDelete: SetNull)
messages AiMessage[]
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
id String @id @default(cuid())
pipelineId String
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
componentKey String?
createdById String?
createdBy User? @relation("AiConversationCreatedBy", fields: [createdById], references: [id], onDelete: SetNull)
messages AiMessage[]
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

@@index([pipelineId, createdAt])
@@index([pipelineId, componentKey])
}

model AiMessage {
Expand All @@ -795,6 +797,7 @@ model AiMessage {
content String
suggestions Json?
pipelineYaml String?
vrlCode String?
createdById String?
createdBy User? @relation("AiMessageCreatedBy", fields: [createdById], references: [id], onDelete: SetNull)
createdAt DateTime @default(now())
Expand Down
246 changes: 246 additions & 0 deletions src/app/api/ai/vrl-chat/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
export const runtime = "nodejs";

import { auth } from "@/auth";
import { prisma } from "@/lib/prisma";
import { streamCompletion } from "@/server/services/ai";
import { buildVrlChatSystemPrompt } from "@/lib/ai/prompts";
import { writeAuditLog } from "@/server/services/audit";
import type { VrlChatResponse } from "@/lib/ai/vrl-suggestion-types";
import { Prisma } from "@/generated/prisma";

export async function POST(request: Request) {
const session = await auth();
if (!session?.user?.id) {
return new Response(JSON.stringify({ error: "Unauthorized" }), {
status: 401,
headers: { "Content-Type": "application/json" },
});
}

let body: {
teamId: string;
prompt: string;
currentCode?: string;
fields?: { name: string; type: string }[];
componentType?: string;
sourceTypes?: string[];
pipelineId: string;
componentKey: string;
conversationId?: string;
};

try {
body = await request.json();
} catch {
return new Response(JSON.stringify({ error: "Invalid JSON" }), {
status: 400,
headers: { "Content-Type": "application/json" },
});
}

if (!body.teamId || !body.prompt || !body.pipelineId || !body.componentKey) {
return new Response(
JSON.stringify({ error: "teamId, prompt, pipelineId, and componentKey are required" }),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}

// Verify user is at least EDITOR on this team
const membership = await prisma.teamMember.findUnique({
where: { userId_teamId: { userId: session.user.id, teamId: body.teamId } },
});
const user = await prisma.user.findUnique({
where: { id: session.user.id },
select: { isSuperAdmin: true },
});

if (!membership && !user?.isSuperAdmin) {
return new Response(JSON.stringify({ error: "Forbidden" }), {
status: 403,
headers: { "Content-Type": "application/json" },
});
}
if (membership && membership.role === "VIEWER" && !user?.isSuperAdmin) {
return new Response(JSON.stringify({ error: "EDITOR role required" }), {
status: 403,
headers: { "Content-Type": "application/json" },
});
}

// Verify pipelineId belongs to the team
const pipeline = await prisma.pipeline.findUnique({
where: { id: body.pipelineId },
select: { environmentId: true, environment: { select: { teamId: true } } },
});
if (!pipeline || pipeline.environment.teamId !== body.teamId) {
return new Response(JSON.stringify({ error: "Pipeline not found" }), {
status: 404,
headers: { "Content-Type": "application/json" },
});
}

// --- Conversation persistence ---
let conversationId = body.conversationId;
let priorMessages: Array<{ role: "user" | "assistant"; content: string }> = [];

if (!conversationId) {
// Reuse existing conversation for this pipeline + component if one exists
const existing = await prisma.aiConversation.findFirst({
where: { pipelineId: body.pipelineId, componentKey: body.componentKey },
orderBy: { createdAt: "desc" },
select: { id: true },
});
if (existing) {
conversationId = existing.id;
} else {
const conversation = await prisma.aiConversation.create({
data: {
pipelineId: body.pipelineId,
componentKey: body.componentKey,
createdById: session.user.id,
},
});
conversationId = conversation.id;
}
} else {
// Verify conversationId belongs to this pipeline + component
const existing = await prisma.aiConversation.findUnique({
where: { id: conversationId },
select: { pipelineId: true, componentKey: true },
});
if (
!existing ||
existing.pipelineId !== body.pipelineId ||
existing.componentKey !== body.componentKey
) {
return new Response(JSON.stringify({ error: "Conversation not found" }), {
status: 404,
headers: { "Content-Type": "application/json" },
});
}
}

// Save user message
await prisma.aiMessage.create({
data: {
conversationId,
role: "user",
content: body.prompt,
vrlCode: body.currentCode ?? null,
createdById: session.user.id,
},
});

// Get most recent 10 messages (desc) then reverse to chronological order
const history = await prisma.aiMessage.findMany({
where: { conversationId },
orderBy: { createdAt: "desc" },
take: 10,
select: { role: true, content: true },
});
history.reverse();

// Exclude the message we just saved (last user msg) — it goes as the current prompt
priorMessages = history.slice(0, -1).map((m) => ({
role: m.role as "user" | "assistant",
content: m.content,
}));

const messages: Array<{ role: "user" | "assistant"; content: string }> = [
...priorMessages,
{ role: "user", content: body.prompt },
];

const systemPrompt = buildVrlChatSystemPrompt({
fields: body.fields,
currentCode: body.currentCode,
componentType: body.componentType,
sourceTypes: body.sourceTypes,
});

const encoder = new TextEncoder();
let fullResponse = "";

const stream = new ReadableStream({
async start(controller) {
try {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ conversationId })}\n\n`),
);

await streamCompletion({
teamId: body.teamId,
systemPrompt,
messages,
onToken: (token) => {
fullResponse += token;
const data = JSON.stringify({ token });
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
},
signal: request.signal,
});

// Persist assistant response
let parsedSuggestions = null;
try {
const parsed: VrlChatResponse = JSON.parse(fullResponse);
if (parsed.summary && Array.isArray(parsed.suggestions)) {
parsedSuggestions = parsed.suggestions;
}
} catch {
// Not valid JSON — store as raw text
}

try {
await prisma.aiMessage.create({
data: {
conversationId: conversationId!,
role: "assistant",
content: fullResponse,
suggestions: (parsedSuggestions as unknown as Prisma.InputJsonValue) ?? undefined,
vrlCode: body.currentCode ?? null,
createdById: session.user.id,
},
});
} catch (err) {
console.error("Failed to persist VRL AI response:", err);
}

writeAuditLog({
userId: session.user.id,
action: "pipeline.vrl_ai_chat",
entityType: "Pipeline",
entityId: body.pipelineId,
metadata: {
conversationId,
componentKey: body.componentKey,
suggestionCount: parsedSuggestions?.length ?? 0,
},
teamId: pipeline.environment.teamId,
environmentId: pipeline.environmentId,
userEmail: session.user.email ?? null,
userName: session.user.name ?? null,
}).catch(() => {});

controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`));
} catch (err) {
if (!request.signal.aborted) {
const message = err instanceof Error ? err.message : "AI request failed";
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ error: message })}\n\n`),
);
}
} finally {
controller.close();
}
},
});

return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
3 changes: 3 additions & 0 deletions src/components/flow/detail-panel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ export function DetailPanel({ pipelineId, isDeployed }: DetailPanelProps) {
onChange={(v) => handleConfigChange({ ...config, source: v })}
sourceTypes={upstream.sourceTypes}
pipelineId={pipelineId}
componentKey={componentKey}
upstreamSourceKeys={upstream.sourceKeys}
/>
</div>
Expand All @@ -475,6 +476,7 @@ export function DetailPanel({ pipelineId, isDeployed }: DetailPanelProps) {
onChange={(v) => handleConfigChange({ ...config, condition: v })}
sourceTypes={upstream.sourceTypes}
pipelineId={pipelineId}
componentKey={componentKey}
upstreamSourceKeys={upstream.sourceKeys}
/>
</div>
Expand Down Expand Up @@ -505,6 +507,7 @@ export function DetailPanel({ pipelineId, isDeployed }: DetailPanelProps) {
height="120px"
sourceTypes={upstream.sourceTypes}
pipelineId={pipelineId}
componentKey={componentKey}
upstreamSourceKeys={upstream.sourceKeys}
/>
</div>
Expand Down
Loading
Loading