diff --git a/docs/public/SUMMARY.md b/docs/public/SUMMARY.md
index 0502a1e7..b2885024 100644
--- a/docs/public/SUMMARY.md
+++ b/docs/public/SUMMARY.md
@@ -20,6 +20,7 @@
* [Alerts](user-guide/alerts.md)
* [Templates](user-guide/templates.md)
* [Shared Components](user-guide/shared-components.md)
+* [AI Suggestions](user-guide/ai-suggestions.md)
## Operations
diff --git a/docs/public/user-guide/ai-suggestions.md b/docs/public/user-guide/ai-suggestions.md
new file mode 100644
index 00000000..e04241c5
--- /dev/null
+++ b/docs/public/user-guide/ai-suggestions.md
@@ -0,0 +1,64 @@
+# AI Suggestions
+
+VectorFlow includes optional AI-powered assistance for writing VRL code and generating pipeline configurations. When enabled, team members with Editor or Admin roles can use AI features in both the VRL editor and pipeline builder.
+
+## Setup
+
+Team admins can configure AI in **Settings → AI**. The configuration requires:
+
+| Field | Description |
+|-------|-------------|
+| **Provider** | OpenAI, Anthropic, or Custom (any OpenAI-compatible endpoint) |
+| **Base URL** | API endpoint — pre-filled for known providers |
+| **API Key** | Provider API key — encrypted at rest using AES-256 |
+| **Model** | Model identifier (e.g. `gpt-4o`, `claude-sonnet-4-20250514`) |
+
+After saving, use **Test Connection** to verify the configuration works.
+
+{% hint style="info" %}
+VectorFlow uses the OpenAI-compatible chat completions API format (`/chat/completions`). Most providers support this format natively or via a compatibility layer. For Anthropic, use an OpenAI-compatible proxy such as LiteLLM or OpenRouter.
+{% endhint %}
+
+## VRL Assistant
+
+In the VRL editor (opened from any remap, filter, or route transform), click the **AI** button in the tools panel to reveal the AI input.
+
+1. Type a natural language description of what you want the VRL code to do
+2. Click **Generate** — the AI streams VRL code in real time
+3. When complete, choose:
+ - **Insert** — append the generated code after your existing code
+ - **Replace** — replace all existing code with the generated result
+ - **Regenerate** — try again with the same prompt
+
+The AI is aware of your upstream source types and available fields, so you can reference them naturally (e.g., "parse the syslog message and extract the hostname").
+
+## Pipeline Builder
+
+In the pipeline editor toolbar, click the **sparkle icon** to open the AI Pipeline Builder dialog.
+
+### Generate mode
+
+Describe a pipeline in plain language:
+
+> "Collect Kubernetes logs from a file source, drop debug-level events, parse JSON, and send to Elasticsearch and S3"
+
+The AI generates a complete Vector YAML configuration. Click **Apply to Canvas** to add the generated components to your pipeline. If your canvas already has components, the new ones are positioned below the existing layout to avoid overlap.
+
+### Review mode
+
+Ask the AI to analyze your current pipeline configuration:
+
+> "Are there any performance issues with my pipeline?"
+
+The AI reviews the generated YAML and provides suggestions for improvements, best practices, and potential issues.
+
+## Rate Limits
+
+AI requests are rate-limited to 60 requests per hour per team to prevent excessive API usage. The limit resets on a rolling window.
+
+## Security
+
+- API keys are encrypted at rest using AES-256-GCM
+- Keys are never exposed to the client — the settings page shows only whether a key is saved
+- AI configuration changes are recorded in the audit log with the API key redacted
+- Only team members with Editor or Admin roles can use AI features
diff --git a/docs/public/user-guide/pipeline-editor.md b/docs/public/user-guide/pipeline-editor.md
index 1dfe6fc5..552a333f 100644
--- a/docs/public/user-guide/pipeline-editor.md
+++ b/docs/public/user-guide/pipeline-editor.md
@@ -257,3 +257,19 @@ Click the pipeline name in the top-left corner of the editor to rename it inline
{% hint style="info" %}
On Windows and Linux, use `Ctrl` instead of `Cmd` for all keyboard shortcuts.
{% endhint %}
+
+## AI-Powered Suggestions
+
+When AI is configured for your team (Settings → AI), two AI features become available:
+
+### VRL Assistant
+In the VRL editor, click the **AI** button in the tools panel. Type a natural language description of what you want the VRL code to do, and the AI will generate VRL code. You can **Insert** (append) or **Replace** the current code.
+
+### Pipeline Builder
+In the pipeline editor toolbar, click the **sparkle icon** to open the AI Pipeline Builder. Two modes are available:
+
+- **Generate**: Describe a pipeline in plain language (e.g., "Collect K8s logs, drop debug, send to Datadog"). The AI generates Vector YAML config that is applied directly to your canvas.
+- **Review**: Ask the AI to review your current pipeline configuration for performance, correctness, and best practices.
+
+### Configuration
+Team admins can configure AI in **Settings → AI**. VectorFlow supports any OpenAI-compatible API (OpenAI, Anthropic, Ollama, Groq, Together, etc.).
diff --git a/prisma/migrations/20260310020000_add_ai_fields/migration.sql b/prisma/migrations/20260310020000_add_ai_fields/migration.sql
new file mode 100644
index 00000000..fec51e28
--- /dev/null
+++ b/prisma/migrations/20260310020000_add_ai_fields/migration.sql
@@ -0,0 +1,6 @@
+-- AlterTable: Add AI configuration fields to Team
+ALTER TABLE "Team" ADD COLUMN "aiProvider" TEXT;
+ALTER TABLE "Team" ADD COLUMN "aiBaseUrl" TEXT;
+ALTER TABLE "Team" ADD COLUMN "aiApiKey" TEXT;
+ALTER TABLE "Team" ADD COLUMN "aiModel" TEXT;
+ALTER TABLE "Team" ADD COLUMN "aiEnabled" BOOLEAN NOT NULL DEFAULT false;
diff --git a/prisma/schema.prisma b/prisma/schema.prisma
index 19eb7b7f..50e3076b 100644
--- a/prisma/schema.prisma
+++ b/prisma/schema.prisma
@@ -66,6 +66,14 @@ model Team {
vrlSnippets VrlSnippet[]
alertRules AlertRule[]
availableTags Json? @default("[]") // string[] of admin-defined classification tags
+
+ // AI-powered suggestions configuration
+ aiProvider String? // "openai" | "anthropic" | "custom"
+ aiBaseUrl String? // OpenAI-compatible API endpoint
+ aiApiKey String? // Encrypted via crypto.ts
+ aiModel String? // e.g. "gpt-4o", "claude-sonnet-4-20250514"
+ aiEnabled Boolean @default(false)
+
createdAt DateTime @default(now())
}
diff --git a/src/app/(dashboard)/pipelines/[id]/page.tsx b/src/app/(dashboard)/pipelines/[id]/page.tsx
index 45eba29f..1bebf069 100644
--- a/src/app/(dashboard)/pipelines/[id]/page.tsx
+++ b/src/app/(dashboard)/pipelines/[id]/page.tsx
@@ -30,12 +30,14 @@ import {
import { ComponentPalette } from "@/components/flow/component-palette";
import { FlowCanvas } from "@/components/flow/flow-canvas";
import { FlowToolbar } from "@/components/flow/flow-toolbar";
+import { AiPipelineDialog } from "@/components/flow/ai-pipeline-dialog";
import { DetailPanel } from "@/components/flow/detail-panel";
import { DeployDialog } from "@/components/flow/deploy-dialog";
import { SaveTemplateDialog } from "@/components/flow/save-template-dialog";
import { ConfirmDialog } from "@/components/confirm-dialog";
import { PipelineMetricsChart } from "@/components/pipeline/metrics-chart";
import { PipelineLogs } from "@/components/pipeline/pipeline-logs";
+import { useTeamStore } from "@/stores/team-store";
function aggregateProcessStatus(
statuses: Array<{ status: string }>
@@ -130,6 +132,16 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
const [discardOpen, setDiscardOpen] = useState(false);
const [metricsOpen, setMetricsOpen] = useState(false);
const [logsOpen, setLogsOpen] = useState(false);
+ const [aiDialogOpen, setAiDialogOpen] = useState(false);
+
+ const selectedTeamId = useTeamStore((s) => s.selectedTeamId);
+ const teamQuery = useQuery(
+ trpc.team.get.queryOptions(
+ { id: selectedTeamId! },
+ { enabled: !!selectedTeamId },
+ ),
+ );
+ const aiEnabled = teamQuery.data?.aiEnabled ?? false;
const loadGraph = useFlowStore((s) => s.loadGraph);
const isDirty = useFlowStore((s) => s.isDirty);
@@ -431,6 +443,8 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
}
gitOpsMode={pipelineQuery.data?.gitOpsMode}
onDiscardChanges={() => setDiscardOpen(true)}
+ aiEnabled={aiEnabled}
+ onAiOpen={() => setAiDialogOpen(true)}
/>
@@ -522,6 +536,13 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
+ {aiEnabled && (
+
+ )}
);
}
diff --git a/src/app/(dashboard)/settings/_components/ai-settings.tsx b/src/app/(dashboard)/settings/_components/ai-settings.tsx
new file mode 100644
index 00000000..4230e90e
--- /dev/null
+++ b/src/app/(dashboard)/settings/_components/ai-settings.tsx
@@ -0,0 +1,269 @@
+"use client";
+
+import { useState } from "react";
+import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query";
+import { useTRPC } from "@/trpc/client";
+import { useTeamStore } from "@/stores/team-store";
+import { toast } from "sonner";
+import { Sparkles, Loader2, CheckCircle, XCircle } from "lucide-react";
+
+import { Button } from "@/components/ui/button";
+import { Input } from "@/components/ui/input";
+import { Label } from "@/components/ui/label";
+import {
+ Card,
+ CardContent,
+ CardDescription,
+ CardHeader,
+ CardTitle,
+} from "@/components/ui/card";
+import {
+ Select,
+ SelectContent,
+ SelectItem,
+ SelectTrigger,
+ SelectValue,
+} from "@/components/ui/select";
+import { Switch } from "@/components/ui/switch";
+import { Skeleton } from "@/components/ui/skeleton";
+import { Badge } from "@/components/ui/badge";
+
+const PROVIDER_DEFAULTS: Record = {
+ openai: { baseUrl: "https://api.openai.com/v1", placeholder: "gpt-4o" },
+ anthropic: { baseUrl: "https://api.anthropic.com/v1", placeholder: "claude-sonnet-4-20250514" },
+ custom: { baseUrl: "", placeholder: "model-name" },
+};
+
+interface AiConfig {
+ aiEnabled: boolean;
+ aiProvider: string | null;
+ aiBaseUrl: string | null;
+ aiModel: string | null;
+ hasApiKey: boolean;
+}
+
+function AiSettingsForm({ config, teamId }: { config: AiConfig; teamId: string }) {
+ const trpc = useTRPC();
+ const queryClient = useQueryClient();
+
+ const [provider, setProvider] = useState(config.aiProvider ?? "openai");
+ const [baseUrl, setBaseUrl] = useState(config.aiBaseUrl ?? "");
+ const [apiKey, setApiKey] = useState("");
+ const [model, setModel] = useState(config.aiModel ?? "");
+ const [enabled, setEnabled] = useState(config.aiEnabled);
+ const [testResult, setTestResult] = useState<{ ok: boolean; error?: string } | null>(null);
+
+ const updateMutation = useMutation(
+ trpc.team.updateAiConfig.mutationOptions({
+ onSuccess: () => {
+ queryClient.invalidateQueries({ queryKey: trpc.team.getAiConfig.queryKey() });
+ queryClient.invalidateQueries({ queryKey: trpc.team.get.queryKey() });
+ toast.success("AI configuration saved");
+ },
+ onError: (error) => {
+ toast.error(error.message || "Failed to save AI config");
+ },
+ }),
+ );
+
+ const testMutation = useMutation(
+ trpc.team.testAiConnection.mutationOptions({
+ onSuccess: (result) => {
+ setTestResult(result);
+ if (result.ok) {
+ toast.success("AI connection successful!");
+ } else {
+ toast.error("Connection failed", { description: result.error });
+ }
+ },
+ onError: (error) => {
+ setTestResult({ ok: false, error: error.message });
+ toast.error("Connection test failed", { description: error.message });
+ },
+ }),
+ );
+
+ const handleSave = () => {
+ const data: Record = {
+ teamId,
+ aiEnabled: enabled,
+ aiProvider: provider as "openai" | "anthropic" | "custom",
+ aiBaseUrl: baseUrl || null,
+ aiModel: model || null,
+ };
+ if (apiKey) {
+ data.aiApiKey = apiKey;
+ }
+ updateMutation.mutate(data as Parameters[0]);
+ };
+
+ const handleTest = () => {
+ setTestResult(null);
+ testMutation.mutate({ teamId });
+ };
+
+ const handleProviderChange = (value: string) => {
+ setProvider(value);
+ const defaults = PROVIDER_DEFAULTS[value];
+ if (defaults) {
+ setBaseUrl(defaults.baseUrl);
+ }
+ setTestResult(null);
+ };
+
+ return (
+
+
+
+
+
+ AI-Powered Suggestions
+
+
+ Configure an OpenAI-compatible AI provider for VRL code assistance and
+ pipeline generation. Credentials are encrypted at rest and scoped to this team.
+
+
+
+ {/* Enable/Disable Toggle */}
+
+
+
Enable AI Suggestions
+
+ When enabled, team members with Editor+ role can use AI assistance in the
+ VRL editor and pipeline builder.
+
+
+
+
+
+ {/* Provider Selection */}
+
+ Provider
+
+
+
+
+
+ OpenAI
+ Anthropic
+ Custom (OpenAI-compatible)
+
+
+
+
+ {/* Base URL */}
+
+
Base URL
+
setBaseUrl(e.target.value)}
+ placeholder={PROVIDER_DEFAULTS[provider]?.baseUrl || "https://api.example.com/v1"}
+ />
+
+ OpenAI-compatible API endpoint. Pre-filled for known providers.
+
+
+
+ {/* API Key */}
+
+
API Key
+
setApiKey(e.target.value)}
+ placeholder={config.hasApiKey ? "••••••••••• (saved)" : "sk-..."}
+ />
+
+ {config.hasApiKey
+ ? "A key is already saved. Enter a new value to replace it."
+ : "Encrypted at rest using AES-256."}
+
+
+
+ {/* Model */}
+
+ Model
+ setModel(e.target.value)}
+ placeholder={PROVIDER_DEFAULTS[provider]?.placeholder || "model-name"}
+ />
+
+
+ {/* Actions */}
+
+
+ {updateMutation.isPending ? (
+ <>
+
+ Saving...
+ >
+ ) : (
+ "Save"
+ )}
+
+
+ {testMutation.isPending ? (
+ <>
+
+ Testing...
+ >
+ ) : (
+ "Test Connection"
+ )}
+
+ {testResult && (
+
+ {testResult.ok ? (
+ <> Connected>
+ ) : (
+ <> Failed>
+ )}
+
+ )}
+
+
+
+
+ );
+}
+
+export function AiSettings() {
+ const trpc = useTRPC();
+ const selectedTeamId = useTeamStore((s) => s.selectedTeamId);
+
+ const configQuery = useQuery(
+ trpc.team.getAiConfig.queryOptions(
+ { teamId: selectedTeamId! },
+ { enabled: !!selectedTeamId },
+ ),
+ );
+
+ if (configQuery.isLoading || !configQuery.data) {
+ return (
+
+
+
+
+ );
+ }
+
+ return ;
+}
diff --git a/src/app/(dashboard)/settings/ai/page.tsx b/src/app/(dashboard)/settings/ai/page.tsx
new file mode 100644
index 00000000..23ebda40
--- /dev/null
+++ b/src/app/(dashboard)/settings/ai/page.tsx
@@ -0,0 +1,7 @@
+"use client";
+
+import { AiSettings } from "../_components/ai-settings";
+
+export default function AiPage() {
+ return ;
+}
diff --git a/src/app/api/ai/pipeline/route.ts b/src/app/api/ai/pipeline/route.ts
new file mode 100644
index 00000000..bd4ed1da
--- /dev/null
+++ b/src/app/api/ai/pipeline/route.ts
@@ -0,0 +1,112 @@
+export const runtime = "nodejs";
+
+import { auth } from "@/auth";
+import { prisma } from "@/lib/prisma";
+import { streamCompletion } from "@/server/services/ai";
+import { buildPipelineSystemPrompt } from "@/lib/ai/prompts";
+
+export async function POST(request: Request) {
+ const session = await auth();
+ if (!session?.user?.id) {
+ return new Response(JSON.stringify({ error: "Unauthorized" }), {
+ status: 401,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ let body: {
+ teamId: string;
+ prompt: string;
+ mode: "generate" | "review";
+ currentYaml?: string;
+ environmentName?: string;
+ };
+
+ try {
+ body = await request.json();
+ } catch {
+ return new Response(JSON.stringify({ error: "Invalid JSON" }), {
+ status: 400,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ if (!body.teamId || !body.prompt || !body.mode) {
+ return new Response(JSON.stringify({ error: "teamId, prompt, and mode are required" }), {
+ status: 400,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ if (body.mode !== "generate" && body.mode !== "review") {
+ return new Response(JSON.stringify({ error: "mode must be 'generate' or 'review'" }), {
+ status: 400,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ // Verify user is at least EDITOR on this team
+ const membership = await prisma.teamMember.findUnique({
+ where: { userId_teamId: { userId: session.user.id, teamId: body.teamId } },
+ });
+ const user = await prisma.user.findUnique({
+ where: { id: session.user.id },
+ select: { isSuperAdmin: true },
+ });
+
+ if (!membership && !user?.isSuperAdmin) {
+ return new Response(JSON.stringify({ error: "Forbidden" }), {
+ status: 403,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+ if (membership && membership.role === "VIEWER" && !user?.isSuperAdmin) {
+ return new Response(JSON.stringify({ error: "EDITOR role required" }), {
+ status: 403,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ const mode = body.mode;
+
+ const systemPrompt = buildPipelineSystemPrompt({
+ mode,
+ currentYaml: body.currentYaml,
+ environmentName: body.environmentName,
+ });
+
+ const encoder = new TextEncoder();
+
+ const stream = new ReadableStream({
+ async start(controller) {
+ try {
+ await streamCompletion({
+ teamId: body.teamId,
+ systemPrompt,
+ userPrompt: body.prompt,
+ onToken: (token) => {
+ const data = JSON.stringify({ token });
+ controller.enqueue(encoder.encode(`data: ${data}\n\n`));
+ },
+ signal: request.signal,
+ });
+ controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`));
+ } catch (err) {
+ const message = err instanceof Error ? err.message : "AI request failed";
+ controller.enqueue(
+ encoder.encode(`data: ${JSON.stringify({ error: message })}\n\n`)
+ );
+ } finally {
+ controller.close();
+ }
+ },
+ });
+
+ return new Response(stream, {
+ headers: {
+ "Content-Type": "text/event-stream",
+ "Cache-Control": "no-cache",
+ Connection: "keep-alive",
+ },
+ });
+}
diff --git a/src/app/api/ai/vrl/route.ts b/src/app/api/ai/vrl/route.ts
new file mode 100644
index 00000000..cc8b348c
--- /dev/null
+++ b/src/app/api/ai/vrl/route.ts
@@ -0,0 +1,105 @@
+export const runtime = "nodejs";
+
+import { auth } from "@/auth";
+import { prisma } from "@/lib/prisma";
+import { streamCompletion } from "@/server/services/ai";
+import { buildVrlSystemPrompt } from "@/lib/ai/prompts";
+
+export async function POST(request: Request) {
+ const session = await auth();
+ if (!session?.user?.id) {
+ return new Response(JSON.stringify({ error: "Unauthorized" }), {
+ status: 401,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ let body: {
+ teamId: string;
+ prompt: string;
+ currentCode?: string;
+ fields?: { name: string; type: string }[];
+ componentType?: string;
+ sourceTypes?: string[];
+ };
+
+ try {
+ body = await request.json();
+ } catch {
+ return new Response(JSON.stringify({ error: "Invalid JSON" }), {
+ status: 400,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ if (!body.teamId || !body.prompt) {
+ return new Response(JSON.stringify({ error: "teamId and prompt are required" }), {
+ status: 400,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ // Verify user is at least EDITOR on this team
+ const membership = await prisma.teamMember.findUnique({
+ where: { userId_teamId: { userId: session.user.id, teamId: body.teamId } },
+ });
+ const user = await prisma.user.findUnique({
+ where: { id: session.user.id },
+ select: { isSuperAdmin: true },
+ });
+
+ if (!membership && !user?.isSuperAdmin) {
+ return new Response(JSON.stringify({ error: "Forbidden" }), {
+ status: 403,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+ if (membership && membership.role === "VIEWER" && !user?.isSuperAdmin) {
+ return new Response(JSON.stringify({ error: "EDITOR role required" }), {
+ status: 403,
+ headers: { "Content-Type": "application/json" },
+ });
+ }
+
+ const systemPrompt = buildVrlSystemPrompt({
+ fields: body.fields,
+ currentCode: body.currentCode,
+ componentType: body.componentType,
+ sourceTypes: body.sourceTypes,
+ });
+
+ const encoder = new TextEncoder();
+
+ const stream = new ReadableStream({
+ async start(controller) {
+ try {
+ await streamCompletion({
+ teamId: body.teamId,
+ systemPrompt,
+ userPrompt: body.prompt,
+ onToken: (token) => {
+ const data = JSON.stringify({ token });
+ controller.enqueue(encoder.encode(`data: ${data}\n\n`));
+ },
+ signal: request.signal,
+ });
+ controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`));
+ } catch (err) {
+ const message = err instanceof Error ? err.message : "AI request failed";
+ controller.enqueue(
+ encoder.encode(`data: ${JSON.stringify({ error: message })}\n\n`)
+ );
+ } finally {
+ controller.close();
+ }
+ },
+ });
+
+ return new Response(stream, {
+ headers: {
+ "Content-Type": "text/event-stream",
+ "Cache-Control": "no-cache",
+ Connection: "keep-alive",
+ },
+ });
+}
diff --git a/src/components/flow/ai-pipeline-dialog.tsx b/src/components/flow/ai-pipeline-dialog.tsx
new file mode 100644
index 00000000..ad9a1214
--- /dev/null
+++ b/src/components/flow/ai-pipeline-dialog.tsx
@@ -0,0 +1,271 @@
+// src/components/flow/ai-pipeline-dialog.tsx
+"use client";
+
+import { useState, useRef, useCallback } from "react";
+import { Loader2, RotateCcw, Sparkles, AlertTriangle } from "lucide-react";
+import { Button } from "@/components/ui/button";
+import { Input } from "@/components/ui/input";
+import { Label } from "@/components/ui/label";
+import {
+ Dialog,
+ DialogContent,
+ DialogDescription,
+ DialogHeader,
+ DialogTitle,
+} from "@/components/ui/dialog";
+import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
+import { useTeamStore } from "@/stores/team-store";
+import { useFlowStore } from "@/stores/flow-store";
+import { generateVectorYaml, importVectorConfig } from "@/lib/config-generator";
+import { toast } from "sonner";
+
+interface AiPipelineDialogProps {
+ open: boolean;
+ onOpenChange: (open: boolean) => void;
+ environmentName?: string;
+}
+
+export function AiPipelineDialog({
+ open,
+ onOpenChange,
+ environmentName,
+}: AiPipelineDialogProps) {
+ const [prompt, setPrompt] = useState("");
+ const [result, setResult] = useState("");
+ const [isStreaming, setIsStreaming] = useState(false);
+ const [error, setError] = useState(null);
+ const [mode, setMode] = useState<"generate" | "review">("generate");
+ const abortRef = useRef(null);
+ const selectedTeamId = useTeamStore((s) => s.selectedTeamId);
+ const nodes = useFlowStore((s) => s.nodes);
+ const edges = useFlowStore((s) => s.edges);
+ const globalConfig = useFlowStore((s) => s.globalConfig);
+ const loadGraph = useFlowStore((s) => s.loadGraph);
+
+ const currentYaml = nodes.length > 0
+ ? generateVectorYaml(nodes, edges, globalConfig)
+ : undefined;
+
+ const handleSubmit = useCallback(
+ async (e?: React.FormEvent) => {
+ e?.preventDefault();
+ if (!prompt.trim() || !selectedTeamId || isStreaming) return;
+
+ setIsStreaming(true);
+ setResult("");
+ setError(null);
+
+ abortRef.current = new AbortController();
+
+ try {
+ const response = await fetch("/api/ai/pipeline", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ teamId: selectedTeamId,
+ prompt: prompt.trim(),
+ mode,
+ currentYaml: mode === "review" ? currentYaml : undefined,
+ environmentName,
+ }),
+ signal: abortRef.current.signal,
+ });
+
+ if (!response.ok) {
+ const errData = await response.json().catch(() => ({ error: "Request failed" }));
+ throw new Error(errData.error || `HTTP ${response.status}`);
+ }
+
+ const reader = response.body?.getReader();
+ if (!reader) throw new Error("No response stream");
+
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+
+ buffer += decoder.decode(value, { stream: true });
+ const lines = buffer.split("\n");
+ buffer = lines.pop() ?? "";
+
+ for (const line of lines) {
+ const trimmed = line.trim();
+ if (!trimmed || !trimmed.startsWith("data: ")) continue;
+
+ try {
+ const data = JSON.parse(trimmed.slice(6));
+ if (data.done) break;
+ if (data.error) throw new Error(data.error);
+ if (data.token) {
+ setResult((prev) => prev + data.token);
+ }
+ } catch (parseErr) {
+ if (parseErr instanceof Error && parseErr.message !== "Unexpected end of JSON input") {
+ throw parseErr;
+ }
+ }
+ }
+ }
+ } catch (err) {
+ if (err instanceof Error && err.name === "AbortError") return;
+ setError(err instanceof Error ? err.message : "AI request failed");
+ } finally {
+ setIsStreaming(false);
+ abortRef.current = null;
+ }
+ },
+ [prompt, selectedTeamId, currentYaml, mode, environmentName, isStreaming],
+ );
+
+ const handleApplyToCanvas = () => {
+ try {
+ // Strip any markdown fencing the LLM might have added
+ let yaml = result.trim();
+ if (yaml.startsWith("```yaml")) yaml = yaml.slice(7);
+ if (yaml.startsWith("```")) yaml = yaml.slice(3);
+ if (yaml.endsWith("```")) yaml = yaml.slice(0, -3);
+ yaml = yaml.trim();
+
+ const { nodes: newNodes, edges: newEdges, globalConfig: importedGlobalConfig } =
+ importVectorConfig(yaml);
+
+ if (nodes.length === 0) {
+ // Empty pipeline: replace
+ loadGraph(newNodes, newEdges, importedGlobalConfig);
+ } else {
+ // Existing pipeline: add alongside (offset positions to avoid overlap)
+ const maxY = Math.max(...nodes.map((n) => n.position.y), 0);
+ const offsetNodes = newNodes.map((n) => ({
+ ...n,
+ position: { x: n.position.x, y: n.position.y + maxY + 200 },
+ }));
+ const mergedConfig = importedGlobalConfig
+ ? { ...importedGlobalConfig, ...globalConfig }
+ : globalConfig;
+ loadGraph([...nodes, ...offsetNodes], [...edges, ...newEdges], mergedConfig);
+ }
+
+ toast.success(`Applied ${newNodes.length} components to canvas`);
+ onOpenChange(false);
+ setResult("");
+ setPrompt("");
+ } catch (err) {
+ toast.error("Failed to parse YAML", {
+ description: err instanceof Error ? err.message : "Invalid YAML output",
+ });
+ }
+ };
+
+ const handleCancel = () => {
+ abortRef.current?.abort();
+ };
+
+ return (
+
+
+
+
+
+ AI Pipeline Builder
+
+
+ Describe what you want to build, or ask for a review of your current pipeline.
+
+
+
+ setMode(v as "generate" | "review")}>
+
+ Generate
+
+ Review
+
+
+
+
+
+ Describe your pipeline
+
+
+
+
+
+
+ Ask about your pipeline
+
+
+
+
+
+ {error && (
+
+ )}
+
+ {(result || isStreaming) && (
+
+
Result
+
+ {result || (
+
+
+ {mode === "generate" ? "Generating pipeline..." : "Reviewing pipeline..."}
+
+ )}
+
+ {!isStreaming && result && (
+
+ {mode === "generate" && (
+
+ Apply to Canvas
+
+ )}
+ { setResult(""); handleSubmit(); }}>
+
+ Regenerate
+
+
+ )}
+
+ )}
+
+
+ );
+}
diff --git a/src/components/flow/flow-toolbar.tsx b/src/components/flow/flow-toolbar.tsx
index ddf71228..ab0ce46f 100644
--- a/src/components/flow/flow-toolbar.tsx
+++ b/src/components/flow/flow-toolbar.tsx
@@ -21,6 +21,7 @@ import {
Info,
Clock,
X,
+ Sparkles,
} from "lucide-react";
import { toast } from "sonner";
import { Button } from "@/components/ui/button";
@@ -72,6 +73,8 @@ interface FlowToolbarProps {
processStatus?: ProcessStatusValue | null;
gitOpsMode?: string;
onDiscardChanges?: () => void;
+ aiEnabled?: boolean;
+ onAiOpen?: () => void;
}
function downloadFile(content: string, filename: string) {
@@ -103,6 +106,8 @@ export function FlowToolbar({
processStatus,
gitOpsMode,
onDiscardChanges,
+ aiEnabled,
+ onAiOpen,
}: FlowToolbarProps) {
const globalConfig = useFlowStore((s) => s.globalConfig);
const canUndo = useFlowStore((s) => s.canUndo);
@@ -330,6 +335,23 @@ export function FlowToolbar({
Save as template
+ {aiEnabled && (
+
+
+
+
+
+
+ AI pipeline builder
+
+ )}
+
{pipelineId && (
<>
diff --git a/src/components/settings-sidebar-nav.tsx b/src/components/settings-sidebar-nav.tsx
index 0b012812..7de46153 100644
--- a/src/components/settings-sidebar-nav.tsx
+++ b/src/components/settings-sidebar-nav.tsx
@@ -9,6 +9,7 @@ import {
HardDrive,
KeyRound,
Bot,
+ Sparkles,
} from "lucide-react";
export const settingsNavGroups = [
@@ -33,6 +34,7 @@ export const settingsNavGroups = [
{ title: "Teams", href: "/settings/teams", icon: Building2, requiredSuperAdmin: true },
{ title: "Team Settings", href: "/settings/team", icon: Users, requiredSuperAdmin: false },
{ title: "Service Accounts", href: "/settings/service-accounts", icon: Bot, requiredSuperAdmin: false },
+ { title: "AI", href: "/settings/ai", icon: Sparkles, requiredSuperAdmin: false },
],
},
{
diff --git a/src/components/vrl-editor/ai-input.tsx b/src/components/vrl-editor/ai-input.tsx
new file mode 100644
index 00000000..f32d7feb
--- /dev/null
+++ b/src/components/vrl-editor/ai-input.tsx
@@ -0,0 +1,186 @@
+// src/components/vrl-editor/ai-input.tsx
+"use client";
+
+import { useState, useRef, useCallback } from "react";
+import { Loader2, RotateCcw, Plus, Replace } from "lucide-react";
+import { Button } from "@/components/ui/button";
+import { Input } from "@/components/ui/input";
+import { useTeamStore } from "@/stores/team-store";
+
+interface AiInputProps {
+ currentCode: string;
+ fields?: { name: string; type: string }[];
+ componentType?: string;
+ sourceTypes?: string[];
+ onInsert: (code: string) => void;
+ onReplace: (code: string) => void;
+}
+
+export function AiInput({
+ currentCode,
+ fields,
+ componentType,
+ sourceTypes,
+ onInsert,
+ onReplace,
+}: AiInputProps) {
+ const [prompt, setPrompt] = useState("");
+ const [result, setResult] = useState("");
+ const [isStreaming, setIsStreaming] = useState(false);
+ const [error, setError] = useState(null);
+ const abortRef = useRef(null);
+ const selectedTeamId = useTeamStore((s) => s.selectedTeamId);
+
+ const handleSubmit = useCallback(
+ async (e?: React.FormEvent) => {
+ e?.preventDefault();
+ if (!prompt.trim() || !selectedTeamId || isStreaming) return;
+
+ setIsStreaming(true);
+ setResult("");
+ setError(null);
+
+ abortRef.current = new AbortController();
+
+ try {
+ const response = await fetch("/api/ai/vrl", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ teamId: selectedTeamId,
+ prompt: prompt.trim(),
+ currentCode,
+ fields,
+ componentType,
+ sourceTypes,
+ }),
+ signal: abortRef.current.signal,
+ });
+
+ if (!response.ok) {
+ const errData = await response.json().catch(() => ({ error: "Request failed" }));
+ throw new Error(errData.error || `HTTP ${response.status}`);
+ }
+
+ const reader = response.body?.getReader();
+ if (!reader) throw new Error("No response stream");
+
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+
+ buffer += decoder.decode(value, { stream: true });
+ const lines = buffer.split("\n");
+ buffer = lines.pop() ?? "";
+
+ for (const line of lines) {
+ const trimmed = line.trim();
+ if (!trimmed || !trimmed.startsWith("data: ")) continue;
+
+ try {
+ const data = JSON.parse(trimmed.slice(6));
+ if (data.done) break;
+ if (data.error) throw new Error(data.error);
+ if (data.token) {
+ setResult((prev) => prev + data.token);
+ }
+ } catch (parseErr) {
+ if (parseErr instanceof Error && parseErr.message !== "Unexpected end of JSON input") {
+ throw parseErr;
+ }
+ }
+ }
+ }
+ } catch (err) {
+ if (err instanceof Error && err.name === "AbortError") return;
+ setError(err instanceof Error ? err.message : "AI request failed");
+ } finally {
+ setIsStreaming(false);
+ abortRef.current = null;
+ }
+ },
+ [prompt, selectedTeamId, currentCode, fields, componentType, sourceTypes, isStreaming],
+ );
+
+ const handleCancel = () => {
+ abortRef.current?.abort();
+ };
+
+ const handleRegenerate = () => {
+ setResult("");
+ handleSubmit();
+ };
+
+ return (
+
+
+
+ {error && (
+
+ {error}
+
+ )}
+
+ {(result || isStreaming) && (
+
+
+ {result || (
+
+
+ Generating...
+
+ )}
+
+ {!isStreaming && result && (
+
+
onInsert(result)}
+ >
+
+ Insert
+
+
onReplace(result)}
+ >
+
+ Replace
+
+
+
+ Regenerate
+
+
+ )}
+
+ )}
+
+ );
+}
diff --git a/src/components/vrl-editor/vrl-editor.tsx b/src/components/vrl-editor/vrl-editor.tsx
index 371b37a1..c8e9d78e 100644
--- a/src/components/vrl-editor/vrl-editor.tsx
+++ b/src/components/vrl-editor/vrl-editor.tsx
@@ -4,7 +4,7 @@ import { useState, useCallback, useRef, useMemo, useEffect } from "react";
import dynamic from "next/dynamic";
import { useTRPC } from "@/trpc/client";
import { useMutation, useQuery } from "@tanstack/react-query";
-import { BookOpen, Code, ChevronLeft, ChevronRight, Columns3, Download, Loader2 } from "lucide-react";
+import { BookOpen, Code, ChevronLeft, ChevronRight, Columns3, Download, Loader2, Sparkles } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Badge } from "@/components/ui/badge";
import { Skeleton } from "@/components/ui/skeleton";
@@ -26,6 +26,8 @@ import { vrlTheme } from "./vrl-theme";
import { VRL_SNIPPETS } from "@/lib/vrl/snippets";
import { VrlSnippetDrawer } from "@/components/flow/vrl-snippet-drawer";
import { VrlFieldsPanel } from "./vrl-fields-panel";
+import { AiInput } from "./ai-input";
+import { useTeamStore } from "@/stores/team-store";
import { getMergedOutputSchemas, getSourceOutputSchema } from "@/lib/vector/source-output-schemas";
import type { Monaco, OnMount } from "@monaco-editor/react";
@@ -64,7 +66,7 @@ export function VrlEditor({ value, onChange, sourceTypes, pipelineId, upstreamSo
const [sampleInput, setSampleInput] = useState("");
const [testOutput, setTestOutput] = useState(null);
const [testError, setTestError] = useState(null);
- const [toolsPanel, setToolsPanel] = useState<"fields" | "snippets" | null>(null);
+ const [toolsPanel, setToolsPanel] = useState<"fields" | "snippets" | "ai" | null>(null);
const [expanded, setExpanded] = useState(false);
const editorRef = useRef(null);
const monacoRef = useRef(null);
@@ -77,6 +79,15 @@ export function VrlEditor({ value, onChange, sourceTypes, pipelineId, upstreamSo
const [sampleIndex, setSampleIndex] = useState(0);
const [liveSchemaFields, setLiveSchemaFields] = useState>([]);
+ const selectedTeamId = useTeamStore((s) => s.selectedTeamId);
+ const teamQuery = useQuery(
+ trpc.team.get.queryOptions(
+ { id: selectedTeamId! },
+ { enabled: !!selectedTeamId },
+ ),
+ );
+ const aiEnabled = teamQuery.data?.aiEnabled ?? false;
+
const isRawTextSource = useMemo(() => {
if (!sourceTypes || sourceTypes.length === 0) return false;
return sourceTypes.some((t) => {
@@ -92,6 +103,19 @@ export function VrlEditor({ value, onChange, sourceTypes, pipelineId, upstreamSo
[sourceTypes],
);
+ const mergedFieldsForAi = useMemo(() => {
+ const staticFields = getMergedOutputSchemas(sourceTypes ?? []);
+ const liveByPath = new Map(liveSchemaFields.map((f) => [f.path, f]));
+ const all = staticFields.map((f) => {
+ liveByPath.delete(f.path);
+ return { name: f.path.replace(/^\./, ""), type: f.type };
+ });
+ for (const [, f] of liveByPath) {
+ all.push({ name: f.path.replace(/^\./, ""), type: f.type });
+ }
+ return all;
+ }, [sourceTypes, liveSchemaFields]);
+
const testMutation = useMutation(
trpc.vrl.test.mutationOptions({
onSuccess: (data) => {
@@ -368,6 +392,16 @@ export function VrlEditor({ value, onChange, sourceTypes, pipelineId, upstreamSo
Snippets
+ {aiEnabled && (
+ setToolsPanel((prev) => (prev === "ai" ? null : "ai"))}
+ >
+
+ AI
+
+ )}
{pipelineId && upstreamSourceKeys && upstreamSourceKeys.length > 0 && (
<>
)}
+ {/* AI panel */}
+ {toolsPanel === "ai" && (
+ {
+ const newValue = value ? `${value}\n${code}` : code;
+ onChange(newValue);
+ }}
+ onReplace={(code) => {
+ onChange(code);
+ }}
+ />
+ )}
+
{/* Test panel (always visible) */}
diff --git a/src/lib/ai/prompts.ts b/src/lib/ai/prompts.ts
new file mode 100644
index 00000000..bfe5cb92
--- /dev/null
+++ b/src/lib/ai/prompts.ts
@@ -0,0 +1,80 @@
+// src/lib/ai/prompts.ts
+
+import { VRL_REFERENCE } from "./vrl-reference";
+
+export function buildVrlSystemPrompt(context: {
+ fields?: { name: string; type: string }[];
+ currentCode?: string;
+ componentType?: string;
+ sourceTypes?: string[];
+}): string {
+ const parts: string[] = [
+ "You are a VRL (Vector Remap Language) code assistant for Vector data pipelines.",
+ "Generate VRL code based on the user's request. Output ONLY the VRL code — no explanations, no markdown fencing, no comments unless the user asks for them.",
+ "",
+ "=== VRL Function Reference ===",
+ VRL_REFERENCE,
+ ];
+
+ if (context.sourceTypes?.length) {
+ parts.push("", `Connected source types: ${context.sourceTypes.join(", ")}`);
+ }
+
+ if (context.componentType) {
+ parts.push(`Transform component type: ${context.componentType}`);
+ }
+
+ if (context.fields?.length) {
+ parts.push("", "Available fields in the event:");
+ for (const f of context.fields) {
+ parts.push(` .${f.name} (${f.type})`);
+ }
+ }
+
+ if (context.currentCode?.trim()) {
+ parts.push("", "Current VRL code in the editor:", "```", context.currentCode, "```");
+ }
+
+ return parts.join("\n");
+}
+
+export function buildPipelineSystemPrompt(context: {
+ mode: "generate" | "review";
+ currentYaml?: string;
+ componentTypes?: string[];
+ environmentName?: string;
+}): string {
+ const parts: string[] = [];
+
+ if (context.mode === "generate") {
+ parts.push(
+ "You are a Vector pipeline generator.",
+ "Generate a valid Vector YAML configuration with sources, transforms, and/or sinks sections based on the user's description.",
+ "Output ONLY valid Vector YAML — no explanations, no markdown fencing.",
+ "",
+ "Rules:",
+ "- Use descriptive component keys (e.g., kafka_source, parse_logs, datadog_sink)",
+ "- Connect components via the `inputs` field in transforms and sinks",
+ "- Use realistic default values for ports, endpoints, etc.",
+ '- For sensitive values use placeholder format: "${ENV_VAR_NAME}"',
+ );
+ } else {
+ parts.push(
+ "You are a Vector pipeline configuration reviewer.",
+ "Analyze the provided Vector pipeline YAML and provide improvement suggestions.",
+ "Focus on: performance, correctness, best practices, and potential issues.",
+ "If the user asks for a revised config, output the complete corrected YAML with no markdown fencing.",
+ "Otherwise, provide suggestions as concise text.",
+ );
+ }
+
+ if (context.environmentName) {
+ parts.push("", `Environment: ${context.environmentName}`);
+ }
+
+ if (context.currentYaml?.trim()) {
+ parts.push("", "Current pipeline configuration:", "```yaml", context.currentYaml, "```");
+ }
+
+ return parts.join("\n");
+}
diff --git a/src/lib/ai/rate-limiter.ts b/src/lib/ai/rate-limiter.ts
new file mode 100644
index 00000000..a1b2b620
--- /dev/null
+++ b/src/lib/ai/rate-limiter.ts
@@ -0,0 +1,38 @@
+// src/lib/ai/rate-limiter.ts
+
+/**
+ * In-memory token bucket rate limiter.
+ * Tracks per-team request counts with a fixed window.
+ */
+
+interface Bucket {
+ tokens: number;
+ lastRefill: number;
+}
+
+const buckets = new Map
();
+
+const DEFAULT_MAX_REQUESTS = 60;
+const WINDOW_MS = 60 * 60 * 1000; // 1 hour
+
+export function checkRateLimit(
+ teamId: string,
+ maxRequests = DEFAULT_MAX_REQUESTS,
+): { allowed: boolean; remaining: number; resetAt: number } {
+ const now = Date.now();
+ let bucket = buckets.get(teamId);
+
+ if (!bucket || now - bucket.lastRefill >= WINDOW_MS) {
+ bucket = { tokens: maxRequests, lastRefill: now };
+ buckets.set(teamId, bucket);
+ }
+
+ const resetAt = bucket.lastRefill + WINDOW_MS;
+
+ if (bucket.tokens <= 0) {
+ return { allowed: false, remaining: 0, resetAt };
+ }
+
+ bucket.tokens -= 1;
+ return { allowed: true, remaining: bucket.tokens, resetAt };
+}
diff --git a/src/lib/ai/vrl-reference.ts b/src/lib/ai/vrl-reference.ts
new file mode 100644
index 00000000..b54347b5
--- /dev/null
+++ b/src/lib/ai/vrl-reference.ts
@@ -0,0 +1,304 @@
+// Auto-generated from vrl-reference.txt — inlined for Next.js production compatibility.
+// Do NOT edit manually. Update vrl-reference.txt and re-generate if needed.
+
+export const VRL_REFERENCE = `# VRL Function Reference (Vector Remap Language)
+# Compact reference for LLM context. Updated for Vector 0.53.
+
+## Type Functions
+to_string(value) -> string | error
+ Convert any value to a string.
+ Example: to_string(42) => "42"
+
+to_int(value) -> integer | error
+ Convert a value to an integer.
+ Example: to_int("42") => 42
+
+to_float(value) -> float | error
+ Convert a value to a float.
+ Example: to_float("3.14") => 3.14
+
+to_bool(value) -> boolean | error
+ Convert a value to a boolean.
+ Example: to_bool("true") => true
+
+to_timestamp(value, [format]) -> timestamp | error
+ Parse a value into a timestamp.
+ Example: to_timestamp("2024-01-15T10:30:00Z") =>
+
+to_unix_timestamp(timestamp, [unit]) -> integer
+ Convert timestamp to Unix epoch. unit: "seconds"|"milliseconds"|"nanoseconds"
+ Example: to_unix_timestamp(now(), unit: "seconds") => 1705312200
+
+## String Functions
+contains(string, substring, [case_sensitive]) -> boolean
+ Check if string contains substring.
+ Example: contains("hello world", "world") => true
+
+starts_with(string, prefix, [case_sensitive]) -> boolean
+ Check if string starts with prefix.
+ Example: starts_with("hello", "hel") => true
+
+ends_with(string, suffix, [case_sensitive]) -> boolean
+ Check if string ends with suffix.
+ Example: ends_with("hello.log", ".log") => true
+
+slice(string, start, [end]) -> string
+ Extract substring by index.
+ Example: slice("hello", 0, 3) => "hel"
+
+replace(string, pattern, replacement, [count]) -> string
+ Replace occurrences of pattern.
+ Example: replace("foo-bar", "-", "_") => "foo_bar"
+
+split(string, separator, [limit]) -> [string]
+ Split string into array.
+ Example: split("a,b,c", ",") => ["a", "b", "c"]
+
+join(array, [separator]) -> string
+ Join array into string.
+ Example: join(["a", "b"], ",") => "a,b"
+
+upcase(string) -> string
+ Convert to uppercase.
+ Example: upcase("hello") => "HELLO"
+
+downcase(string) -> string
+ Convert to lowercase.
+ Example: downcase("HELLO") => "hello"
+
+strip_whitespace(string) -> string
+ Remove leading/trailing whitespace.
+ Example: strip_whitespace(" hello ") => "hello"
+
+truncate(string, limit, [ellipsis], [suffix]) -> string
+ Truncate string to max length.
+ Example: truncate("hello world", 5, suffix: "...") => "hello..."
+
+strlen(string) -> integer
+ Return string length.
+ Example: strlen("hello") => 5
+
+## Parse Functions
+parse_json(string) -> any | error
+ Parse JSON string into VRL value.
+ Example: parse_json("{\\"key\\":\\"value\\"}") => {"key": "value"}
+
+parse_syslog(string) -> object | error
+ Parse syslog message (RFC 5424/3164).
+ Example: parse_syslog("<34>1 2024-01-15T10:30:00Z host app - - msg") => {facility: "auth", severity: "crit", ...}
+
+parse_csv(string, [delimiter]) -> [string]
+ Parse CSV row.
+ Example: parse_csv("a,b,c") => ["a", "b", "c"]
+
+parse_key_value(string, [separator], [field_delimiter]) -> object | error
+ Parse key=value pairs.
+ Example: parse_key_value("a=1 b=2") => {"a": "1", "b": "2"}
+
+parse_grok(string, pattern) -> object | error
+ Parse with Grok pattern.
+ Example: parse_grok("55.3.244.1 GET /index", "%{IP:client} %{WORD:method} %{URIPATHPARAM:request}") => {"client": "55.3.244.1", ...}
+
+parse_regex(string, pattern) -> object | error
+ Parse with regex named captures.
+ Example: parse_regex("user=bob age=30", r'user=(?P\\w+) age=(?P\\d+)') => {"user": "bob", "age": "30"}
+
+parse_timestamp(string, format) -> timestamp | error
+ Parse timestamp with strftime format.
+ Example: parse_timestamp("2024-01-15", "%Y-%m-%d") =>
+
+parse_url(string) -> object | error
+ Parse URL into components.
+ Example: parse_url("https://example.com:8080/path?q=1") => {scheme: "https", host: "example.com", port: 8080, ...}
+
+## Encode Functions
+encode_json(value) -> string
+ Encode value as JSON.
+ Example: encode_json({"key": "value"}) => "{\\"key\\":\\"value\\"}"
+
+encode_base64(string) -> string
+ Base64 encode.
+ Example: encode_base64("hello") => "aGVsbG8="
+
+decode_base64(string) -> string | error
+ Base64 decode.
+ Example: decode_base64("aGVsbG8=") => "hello"
+
+## Coerce Functions
+to_syslog_level(integer) -> string | error
+ Convert syslog level number to name.
+ Example: to_syslog_level(0) => "emerg"
+
+to_syslog_facility(integer) -> string | error
+ Convert syslog facility number to name.
+ Example: to_syslog_facility(0) => "kern"
+
+## Object/Array Functions
+keys(object) -> [string]
+ Get object keys.
+ Example: keys({"a": 1, "b": 2}) => ["a", "b"]
+
+values(object) -> [any]
+ Get object values.
+ Example: values({"a": 1, "b": 2}) => [1, 2]
+
+length(value) -> integer
+ Length of string, array, or object.
+ Example: length([1, 2, 3]) => 3
+
+flatten(array) -> array
+ Flatten nested arrays.
+ Example: flatten([[1, 2], [3]]) => [1, 2, 3]
+
+append(array, value) -> array
+ Append value to array.
+ Example: append([1, 2], 3) => [1, 2, 3]
+
+push(array, value) -> array
+ Same as append.
+
+compact(value) -> value
+ Remove null values from object/array.
+ Example: compact({"a": 1, "b": null}) => {"a": 1}
+
+merge(object1, object2, [deep]) -> object
+ Merge objects. Later values win.
+ Example: merge({"a": 1}, {"b": 2}) => {"a": 1, "b": 2}
+
+set(object, path, value) -> object
+ Set nested value by path array.
+ Example: set({}, ["a", "b"], 1) => {"a": {"b": 1}}
+
+get(object, path) -> any | null
+ Get nested value by path array.
+ Example: get({"a": {"b": 1}}, ["a", "b"]) => 1
+
+del(object, path) -> any
+ Delete and return value at path.
+ Example: del(., .temp_field) removes .temp_field
+
+exists(path) -> boolean
+ Check if field exists.
+ Example: exists(.hostname) => true/false
+
+## IP Functions
+ip_cidr_contains(cidr, ip) -> boolean
+ Check if IP is in CIDR range.
+ Example: ip_cidr_contains("192.168.0.0/16", "192.168.1.1") => true
+
+ip_to_ipv6(ip) -> string
+ Convert IPv4 to IPv6 mapped address.
+ Example: ip_to_ipv6("1.2.3.4") => "::ffff:1.2.3.4"
+
+## Hash/Crypto Functions
+sha2(string, [variant]) -> string
+ SHA-2 hash. variant: 224|256|384|512 (default 256).
+ Example: sha2("hello") => "2cf24dba..."
+
+md5(string) -> string
+ MD5 hash.
+ Example: md5("hello") => "5d41402a..."
+
+hmac(value, key, [algorithm]) -> string
+ HMAC signature. algorithm: "SHA-256" (default).
+ Example: hmac("message", "secret") => "aa747c..."
+
+uuid_v4() -> string
+ Generate UUID v4.
+ Example: uuid_v4() => "550e8400-e29b-41d4-a716-446655440000"
+
+uuid_v7() -> string
+ Generate UUID v7 (time-sorted).
+ Example: uuid_v7() => "01876d34-..."
+
+## Timestamp Functions
+now() -> timestamp
+ Current UTC timestamp.
+ Example: .timestamp = now()
+
+format_timestamp(timestamp, format, [timezone]) -> string
+ Format timestamp as string.
+ Example: format_timestamp(now(), "%Y-%m-%d") => "2024-01-15"
+
+## Numeric Functions
+ceil(float) -> integer
+ Round up.
+ Example: ceil(3.2) => 4
+
+floor(float) -> integer
+ Round down.
+ Example: floor(3.8) => 3
+
+round(float, [precision]) -> float
+ Round to precision.
+ Example: round(3.456, precision: 2) => 3.46
+
+mod(value, modulus) -> integer
+ Modulo.
+ Example: mod(7, 3) => 1
+
+abs(number) -> number
+ Absolute value.
+ Example: abs(-5) => 5
+
+## Event Metadata Functions
+set_semantic_meaning(target, meaning) -> null
+ Annotate field with semantic meaning (e.g., "timestamp", "message", "host").
+ Example: set_semantic_meaning(.ts, "timestamp")
+
+log(message, [level], [rate_limit_secs]) -> null
+ Emit a log message during processing.
+ Example: log("processing event", level: "info")
+
+assert(condition, message) -> null | error
+ Assert condition is true.
+ Example: assert(exists(.message), "message field required")
+
+abort -> never
+ Drop the event.
+ Example: if .level == "debug" { abort }
+
+## Enrichment Functions
+get_enrichment_table_record(table, condition) -> object | error
+ Look up record from enrichment table.
+ Example: get_enrichment_table_record("geoip", {"ip": .client_ip})
+
+find_enrichment_table_records(table, condition) -> [object] | error
+ Find all matching records from enrichment table.
+ Example: find_enrichment_table_records("users", {"status": "active"})
+
+## Common VRL Patterns
+
+# Rename field
+.new_name = del(.old_name)
+
+# Add/set field
+.environment = "production"
+
+# Conditional field
+if exists(.user_agent) {
+ .browser = parse_regex!(.user_agent, r'(?PChrome|Firefox|Safari)')
+}
+
+# Drop event
+if .level == "debug" { abort }
+
+# Coalesce (first non-null)
+.host = .hostname ?? .host ?? "unknown"
+
+# Error handling with ! (abort on error)
+.parsed = parse_json!(.message)
+
+# Error handling with ?? (fallback)
+.parsed = parse_json(.message) ?? {}
+
+# Map over nested fields
+.tags = map_values(.tags) -> |_key, value| { downcase!(value) }
+
+# Redact sensitive data
+.email = redact(.email, filters: ["pattern"], redactor: {"type": "text", "replacement": "[REDACTED]"}, patterns: [r'\\S+@\\S+'])
+
+# Type coercion
+.status_code = to_int!(.status_code)
+.timestamp = to_timestamp!(.timestamp)
+`;
diff --git a/src/lib/ai/vrl-reference.txt b/src/lib/ai/vrl-reference.txt
new file mode 100644
index 00000000..d5a84928
--- /dev/null
+++ b/src/lib/ai/vrl-reference.txt
@@ -0,0 +1,300 @@
+# VRL Function Reference (Vector Remap Language)
+# Compact reference for LLM context. Updated for Vector 0.53.
+
+## Type Functions
+to_string(value) -> string | error
+ Convert any value to a string.
+ Example: to_string(42) => "42"
+
+to_int(value) -> integer | error
+ Convert a value to an integer.
+ Example: to_int("42") => 42
+
+to_float(value) -> float | error
+ Convert a value to a float.
+ Example: to_float("3.14") => 3.14
+
+to_bool(value) -> boolean | error
+ Convert a value to a boolean.
+ Example: to_bool("true") => true
+
+to_timestamp(value, [format]) -> timestamp | error
+ Parse a value into a timestamp.
+ Example: to_timestamp("2024-01-15T10:30:00Z") =>
+
+to_unix_timestamp(timestamp, [unit]) -> integer
+ Convert timestamp to Unix epoch. unit: "seconds"|"milliseconds"|"nanoseconds"
+ Example: to_unix_timestamp(now(), unit: "seconds") => 1705312200
+
+## String Functions
+contains(string, substring, [case_sensitive]) -> boolean
+ Check if string contains substring.
+ Example: contains("hello world", "world") => true
+
+starts_with(string, prefix, [case_sensitive]) -> boolean
+ Check if string starts with prefix.
+ Example: starts_with("hello", "hel") => true
+
+ends_with(string, suffix, [case_sensitive]) -> boolean
+ Check if string ends with suffix.
+ Example: ends_with("hello.log", ".log") => true
+
+slice(string, start, [end]) -> string
+ Extract substring by index.
+ Example: slice("hello", 0, 3) => "hel"
+
+replace(string, pattern, replacement, [count]) -> string
+ Replace occurrences of pattern.
+ Example: replace("foo-bar", "-", "_") => "foo_bar"
+
+split(string, separator, [limit]) -> [string]
+ Split string into array.
+ Example: split("a,b,c", ",") => ["a", "b", "c"]
+
+join(array, [separator]) -> string
+ Join array into string.
+ Example: join(["a", "b"], ",") => "a,b"
+
+upcase(string) -> string
+ Convert to uppercase.
+ Example: upcase("hello") => "HELLO"
+
+downcase(string) -> string
+ Convert to lowercase.
+ Example: downcase("HELLO") => "hello"
+
+strip_whitespace(string) -> string
+ Remove leading/trailing whitespace.
+ Example: strip_whitespace(" hello ") => "hello"
+
+truncate(string, limit, [ellipsis], [suffix]) -> string
+ Truncate string to max length.
+ Example: truncate("hello world", 5, suffix: "...") => "hello..."
+
+strlen(string) -> integer
+ Return string length.
+ Example: strlen("hello") => 5
+
+## Parse Functions
+parse_json(string) -> any | error
+ Parse JSON string into VRL value.
+ Example: parse_json("{\"key\":\"value\"}") => {"key": "value"}
+
+parse_syslog(string) -> object | error
+ Parse syslog message (RFC 5424/3164).
+ Example: parse_syslog("<34>1 2024-01-15T10:30:00Z host app - - msg") => {facility: "auth", severity: "crit", ...}
+
+parse_csv(string, [delimiter]) -> [string]
+ Parse CSV row.
+ Example: parse_csv("a,b,c") => ["a", "b", "c"]
+
+parse_key_value(string, [separator], [field_delimiter]) -> object | error
+ Parse key=value pairs.
+ Example: parse_key_value("a=1 b=2") => {"a": "1", "b": "2"}
+
+parse_grok(string, pattern) -> object | error
+ Parse with Grok pattern.
+ Example: parse_grok("55.3.244.1 GET /index", "%{IP:client} %{WORD:method} %{URIPATHPARAM:request}") => {"client": "55.3.244.1", ...}
+
+parse_regex(string, pattern) -> object | error
+ Parse with regex named captures.
+ Example: parse_regex("user=bob age=30", r'user=(?P\w+) age=(?P\d+)') => {"user": "bob", "age": "30"}
+
+parse_timestamp(string, format) -> timestamp | error
+ Parse timestamp with strftime format.
+ Example: parse_timestamp("2024-01-15", "%Y-%m-%d") =>
+
+parse_url(string) -> object | error
+ Parse URL into components.
+ Example: parse_url("https://example.com:8080/path?q=1") => {scheme: "https", host: "example.com", port: 8080, ...}
+
+## Encode Functions
+encode_json(value) -> string
+ Encode value as JSON.
+ Example: encode_json({"key": "value"}) => "{\"key\":\"value\"}"
+
+encode_base64(string) -> string
+ Base64 encode.
+ Example: encode_base64("hello") => "aGVsbG8="
+
+decode_base64(string) -> string | error
+ Base64 decode.
+ Example: decode_base64("aGVsbG8=") => "hello"
+
+## Coerce Functions
+to_syslog_level(integer) -> string | error
+ Convert syslog level number to name.
+ Example: to_syslog_level(0) => "emerg"
+
+to_syslog_facility(integer) -> string | error
+ Convert syslog facility number to name.
+ Example: to_syslog_facility(0) => "kern"
+
+## Object/Array Functions
+keys(object) -> [string]
+ Get object keys.
+ Example: keys({"a": 1, "b": 2}) => ["a", "b"]
+
+values(object) -> [any]
+ Get object values.
+ Example: values({"a": 1, "b": 2}) => [1, 2]
+
+length(value) -> integer
+ Length of string, array, or object.
+ Example: length([1, 2, 3]) => 3
+
+flatten(array) -> array
+ Flatten nested arrays.
+ Example: flatten([[1, 2], [3]]) => [1, 2, 3]
+
+append(array, value) -> array
+ Append value to array.
+ Example: append([1, 2], 3) => [1, 2, 3]
+
+push(array, value) -> array
+ Same as append.
+
+compact(value) -> value
+ Remove null values from object/array.
+ Example: compact({"a": 1, "b": null}) => {"a": 1}
+
+merge(object1, object2, [deep]) -> object
+ Merge objects. Later values win.
+ Example: merge({"a": 1}, {"b": 2}) => {"a": 1, "b": 2}
+
+set(object, path, value) -> object
+ Set nested value by path array.
+ Example: set({}, ["a", "b"], 1) => {"a": {"b": 1}}
+
+get(object, path) -> any | null
+ Get nested value by path array.
+ Example: get({"a": {"b": 1}}, ["a", "b"]) => 1
+
+del(object, path) -> any
+ Delete and return value at path.
+ Example: del(., .temp_field) removes .temp_field
+
+exists(path) -> boolean
+ Check if field exists.
+ Example: exists(.hostname) => true/false
+
+## IP Functions
+ip_cidr_contains(cidr, ip) -> boolean
+ Check if IP is in CIDR range.
+ Example: ip_cidr_contains("192.168.0.0/16", "192.168.1.1") => true
+
+ip_to_ipv6(ip) -> string
+ Convert IPv4 to IPv6 mapped address.
+ Example: ip_to_ipv6("1.2.3.4") => "::ffff:1.2.3.4"
+
+## Hash/Crypto Functions
+sha2(string, [variant]) -> string
+ SHA-2 hash. variant: 224|256|384|512 (default 256).
+ Example: sha2("hello") => "2cf24dba..."
+
+md5(string) -> string
+ MD5 hash.
+ Example: md5("hello") => "5d41402a..."
+
+hmac(value, key, [algorithm]) -> string
+ HMAC signature. algorithm: "SHA-256" (default).
+ Example: hmac("message", "secret") => "aa747c..."
+
+uuid_v4() -> string
+ Generate UUID v4.
+ Example: uuid_v4() => "550e8400-e29b-41d4-a716-446655440000"
+
+uuid_v7() -> string
+ Generate UUID v7 (time-sorted).
+ Example: uuid_v7() => "01876d34-..."
+
+## Timestamp Functions
+now() -> timestamp
+ Current UTC timestamp.
+ Example: .timestamp = now()
+
+format_timestamp(timestamp, format, [timezone]) -> string
+ Format timestamp as string.
+ Example: format_timestamp(now(), "%Y-%m-%d") => "2024-01-15"
+
+## Numeric Functions
+ceil(float) -> integer
+ Round up.
+ Example: ceil(3.2) => 4
+
+floor(float) -> integer
+ Round down.
+ Example: floor(3.8) => 3
+
+round(float, [precision]) -> float
+ Round to precision.
+ Example: round(3.456, precision: 2) => 3.46
+
+mod(value, modulus) -> integer
+ Modulo.
+ Example: mod(7, 3) => 1
+
+abs(number) -> number
+ Absolute value.
+ Example: abs(-5) => 5
+
+## Event Metadata Functions
+set_semantic_meaning(target, meaning) -> null
+ Annotate field with semantic meaning (e.g., "timestamp", "message", "host").
+ Example: set_semantic_meaning(.ts, "timestamp")
+
+log(message, [level], [rate_limit_secs]) -> null
+ Emit a log message during processing.
+ Example: log("processing event", level: "info")
+
+assert(condition, message) -> null | error
+ Assert condition is true.
+ Example: assert(exists(.message), "message field required")
+
+abort -> never
+ Drop the event.
+ Example: if .level == "debug" { abort }
+
+## Enrichment Functions
+get_enrichment_table_record(table, condition) -> object | error
+ Look up record from enrichment table.
+ Example: get_enrichment_table_record("geoip", {"ip": .client_ip})
+
+find_enrichment_table_records(table, condition) -> [object] | error
+ Find all matching records from enrichment table.
+ Example: find_enrichment_table_records("users", {"status": "active"})
+
+## Common VRL Patterns
+
+# Rename field
+.new_name = del(.old_name)
+
+# Add/set field
+.environment = "production"
+
+# Conditional field
+if exists(.user_agent) {
+ .browser = parse_regex!(.user_agent, r'(?PChrome|Firefox|Safari)')
+}
+
+# Drop event
+if .level == "debug" { abort }
+
+# Coalesce (first non-null)
+.host = .hostname ?? .host ?? "unknown"
+
+# Error handling with ! (abort on error)
+.parsed = parse_json!(.message)
+
+# Error handling with ?? (fallback)
+.parsed = parse_json(.message) ?? {}
+
+# Map over nested fields
+.tags = map_values(.tags) -> |_key, value| { downcase!(value) }
+
+# Redact sensitive data
+.email = redact(.email, filters: ["pattern"], redactor: {"type": "text", "replacement": "[REDACTED]"}, patterns: [r'\S+@\S+'])
+
+# Type coercion
+.status_code = to_int!(.status_code)
+.timestamp = to_timestamp!(.timestamp)
diff --git a/src/server/middleware/audit.ts b/src/server/middleware/audit.ts
index 91bf60ac..0dcb8142 100644
--- a/src/server/middleware/audit.ts
+++ b/src/server/middleware/audit.ts
@@ -6,6 +6,7 @@ const SENSITIVE_KEYS = new Set([
"password", "currentPassword", "newPassword",
"token", "secret", "key", "keyBase64",
"passwordHash", "httpsToken", "sshKey",
+ "aiApiKey",
]);
function sanitizeInput(input: unknown): unknown {
diff --git a/src/server/routers/team.ts b/src/server/routers/team.ts
index a1d3c5fa..a289f2cb 100644
--- a/src/server/routers/team.ts
+++ b/src/server/routers/team.ts
@@ -5,6 +5,8 @@ import { prisma } from "@/lib/prisma";
import bcrypt from "bcryptjs";
import crypto from "crypto";
import { withAudit } from "@/server/middleware/audit";
+import { encrypt } from "@/server/services/crypto";
+import { testAiConnection } from "@/server/services/ai";
/**
* Block manual team assignment/role changes for OIDC users when their
@@ -76,7 +78,7 @@ export const teamRouter = router({
select: { isSuperAdmin: true },
});
- return prisma.team.findMany({
+ const teams = await prisma.team.findMany({
where: {
name: { not: "__system__" },
...(user?.isSuperAdmin ? {} : { members: { some: { userId } } }),
@@ -86,6 +88,9 @@ export const teamRouter = router({
},
orderBy: { createdAt: "desc" },
});
+ // Strip encrypted API key — never send to client
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ return teams.map(({ aiApiKey: _aiApiKey, ...safeTeam }) => safeTeam);
}),
get: protectedProcedure
@@ -104,7 +109,10 @@ export const teamRouter = router({
if (!team) {
throw new TRPCError({ code: "NOT_FOUND", message: "Team not found" });
}
- return team;
+ // Strip encrypted API key — never send to client
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ const { aiApiKey: _aiApiKey, ...safeTeam } = team;
+ return safeTeam;
}),
create: protectedProcedure
@@ -466,4 +474,64 @@ export const teamRouter = router({
select: { id: true, authMethod: true },
});
}),
+
+ getAiConfig: protectedProcedure
+ .use(withTeamAccess("ADMIN"))
+ .input(z.object({ teamId: z.string() }))
+ .query(async ({ input }) => {
+ const team = await prisma.team.findUniqueOrThrow({
+ where: { id: input.teamId },
+ select: {
+ aiEnabled: true,
+ aiProvider: true,
+ aiBaseUrl: true,
+ aiModel: true,
+ aiApiKey: true,
+ },
+ });
+ return {
+ aiEnabled: team.aiEnabled,
+ aiProvider: team.aiProvider,
+ aiBaseUrl: team.aiBaseUrl,
+ aiModel: team.aiModel,
+ hasApiKey: !!team.aiApiKey,
+ };
+ }),
+
+ updateAiConfig: protectedProcedure
+ .use(withTeamAccess("ADMIN"))
+ .use(withAudit("team.ai_config_updated", "Team"))
+ .input(
+ z.object({
+ teamId: z.string(),
+ aiEnabled: z.boolean().optional(),
+ aiProvider: z.enum(["openai", "anthropic", "custom"]).nullable().optional(),
+ aiBaseUrl: z.string().nullable().optional(),
+ aiModel: z.string().nullable().optional(),
+ aiApiKey: z.string().nullable().optional(),
+ }),
+ )
+ .mutation(async ({ input }) => {
+ const { teamId, aiApiKey, ...rest } = input;
+ const data: Record = { ...rest };
+
+ // Encrypt API key if provided
+ if (aiApiKey !== undefined) {
+ data.aiApiKey = aiApiKey ? `enc:${encrypt(aiApiKey)}` : null;
+ }
+
+ return prisma.team.update({
+ where: { id: teamId },
+ data,
+ select: { id: true, aiEnabled: true, aiProvider: true, aiBaseUrl: true, aiModel: true },
+ });
+ }),
+
+ testAiConnection: protectedProcedure
+ .use(withTeamAccess("ADMIN"))
+ .use(withAudit("team.ai_connection_tested", "Team"))
+ .input(z.object({ teamId: z.string() }))
+ .mutation(async ({ input }) => {
+ return testAiConnection(input.teamId);
+ }),
});
diff --git a/src/server/services/ai.ts b/src/server/services/ai.ts
new file mode 100644
index 00000000..b29aa182
--- /dev/null
+++ b/src/server/services/ai.ts
@@ -0,0 +1,207 @@
+import { prisma } from "@/lib/prisma";
+import { decrypt } from "./crypto";
+import { checkRateLimit } from "@/lib/ai/rate-limiter";
+
+const ENCRYPTED_PREFIX = "enc:";
+
+const ALLOWED_PROTOCOLS = new Set(["http:", "https:"]);
+
+function validateBaseUrl(baseUrl: string): void {
+ let parsed: URL;
+ try {
+ parsed = new URL(baseUrl);
+ } catch {
+ throw new Error("Invalid AI base URL");
+ }
+
+ if (!ALLOWED_PROTOCOLS.has(parsed.protocol)) {
+ throw new Error("AI base URL must use http or https");
+ }
+
+ // URL.hostname strips brackets from IPv6 and normalises numeric encodings
+ const hostname = parsed.hostname.toLowerCase();
+
+ const isBlocked =
+ // Loopback
+ hostname === "localhost" ||
+ hostname === "127.0.0.1" ||
+ hostname === "::1" ||
+ hostname === "0.0.0.0" ||
+ hostname === "::" ||
+ // mDNS / internal TLDs
+ hostname.endsWith(".local") ||
+ hostname.endsWith(".internal") ||
+ // IPv4 private ranges
+ hostname.startsWith("10.") ||
+ hostname.startsWith("192.168.") ||
+ /^172\.(1[6-9]|2\d|3[01])\./.test(hostname) ||
+ // Link-local (full range)
+ hostname.startsWith("169.254.") ||
+ // Cloud metadata endpoints
+ hostname === "metadata.google.internal" ||
+ // IPv6 link-local, unique-local, and IPv4-mapped
+ hostname.startsWith("fe80:") ||
+ hostname.startsWith("fc00:") ||
+ hostname.startsWith("fd00:") ||
+ hostname.startsWith("::ffff:");
+
+ if (isBlocked) {
+ throw new Error("AI base URL must not point to internal or private addresses");
+ }
+}
+
+interface StreamCompletionParams {
+ teamId: string;
+ systemPrompt: string;
+ userPrompt: string;
+ onToken: (token: string) => void;
+ signal?: AbortSignal;
+}
+
+function decryptApiKey(encryptedKey: string): string {
+ if (encryptedKey.startsWith(ENCRYPTED_PREFIX)) {
+ return decrypt(encryptedKey.slice(ENCRYPTED_PREFIX.length));
+ }
+ return encryptedKey;
+}
+
+function getDefaultBaseUrl(provider: string | null): string {
+ switch (provider) {
+ case "anthropic":
+ return "https://api.anthropic.com/v1";
+ case "openai":
+ default:
+ return "https://api.openai.com/v1";
+ }
+}
+
+export async function getTeamAiConfig(teamId: string, { requireEnabled = true } = {}) {
+ const team = await prisma.team.findUnique({
+ where: { id: teamId },
+ select: {
+ aiEnabled: true,
+ aiProvider: true,
+ aiBaseUrl: true,
+ aiApiKey: true,
+ aiModel: true,
+ },
+ });
+
+ if (!team) throw new Error("Team not found");
+ if (requireEnabled && !team.aiEnabled) throw new Error("AI is not enabled for this team");
+ if (!team.aiApiKey) throw new Error("AI API key is not configured");
+
+ return {
+ provider: team.aiProvider ?? "openai",
+ baseUrl: team.aiBaseUrl || getDefaultBaseUrl(team.aiProvider),
+ apiKey: decryptApiKey(team.aiApiKey),
+ model: team.aiModel ?? "gpt-4o",
+ };
+}
+
+export async function streamCompletion({
+ teamId,
+ systemPrompt,
+ userPrompt,
+ onToken,
+ signal,
+}: StreamCompletionParams): Promise {
+ const rateLimit = checkRateLimit(teamId);
+ if (!rateLimit.allowed) {
+ throw new Error(
+ `Rate limit exceeded. Resets at ${new Date(rateLimit.resetAt).toISOString()}`
+ );
+ }
+
+ const config = await getTeamAiConfig(teamId);
+ validateBaseUrl(config.baseUrl);
+
+ const response = await fetch(`${config.baseUrl}/chat/completions`, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Authorization: `Bearer ${config.apiKey}`,
+ },
+ body: JSON.stringify({
+ model: config.model,
+ stream: true,
+ messages: [
+ { role: "system", content: systemPrompt },
+ { role: "user", content: userPrompt },
+ ],
+ }),
+ signal,
+ });
+
+ if (!response.ok) {
+ const errorText = await response.text().catch(() => "Unknown error");
+ throw new Error(`AI provider error (${response.status}): ${errorText}`);
+ }
+
+ if (!response.body) {
+ throw new Error("No response body from AI provider");
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ try {
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+
+ buffer += decoder.decode(value, { stream: true });
+ const lines = buffer.split("\n");
+ buffer = lines.pop() ?? "";
+
+ for (const line of lines) {
+ const trimmed = line.trim();
+ if (!trimmed || !trimmed.startsWith("data: ")) continue;
+ const data = trimmed.slice(6);
+ if (data === "[DONE]") return;
+
+ try {
+ const parsed = JSON.parse(data);
+ const content = parsed.choices?.[0]?.delta?.content;
+ if (content) {
+ onToken(content);
+ }
+ } catch {
+ // Skip malformed SSE lines
+ }
+ }
+ }
+ } finally {
+ reader.releaseLock();
+ }
+}
+
+export async function testAiConnection(teamId: string): Promise<{ ok: boolean; error?: string }> {
+ try {
+ const config = await getTeamAiConfig(teamId, { requireEnabled: false });
+ validateBaseUrl(config.baseUrl);
+
+ const response = await fetch(`${config.baseUrl}/chat/completions`, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Authorization: `Bearer ${config.apiKey}`,
+ },
+ body: JSON.stringify({
+ model: config.model,
+ max_tokens: 5,
+ messages: [{ role: "user", content: "Say hi" }],
+ }),
+ });
+
+ if (!response.ok) {
+ const errorText = await response.text().catch(() => "Unknown error");
+ return { ok: false, error: `${response.status}: ${errorText}` };
+ }
+
+ return { ok: true };
+ } catch (err) {
+ return { ok: false, error: err instanceof Error ? err.message : String(err) };
+ }
+}