Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
740abc5
feat: add TypeScript types for structured AI suggestions
TerrifiedBug Mar 10, 2026
9ce60a2
feat: add AiConversation and AiMessage models for persistent AI revie…
TerrifiedBug Mar 10, 2026
4c8e116
feat: add pure functions for applying AI suggestions to flow state
TerrifiedBug Mar 10, 2026
20fe227
feat: add conflict detection for selected AI suggestions
TerrifiedBug Mar 10, 2026
a6903c6
refactor: extend streamCompletion to accept conversation message history
TerrifiedBug Mar 10, 2026
432394a
feat: add AI suggestion validation and outdated detection utilities
TerrifiedBug Mar 10, 2026
c129973
feat: rewrite AI review prompt to return structured JSON suggestions
TerrifiedBug Mar 10, 2026
3bf2cf9
feat: add tRPC AI router for conversation CRUD and suggestion tracking
TerrifiedBug Mar 10, 2026
561a413
feat: add batch applySuggestions action to flow store with single und…
TerrifiedBug Mar 10, 2026
b06f91b
feat: add conversation persistence and message history to AI pipeline…
TerrifiedBug Mar 10, 2026
d343ed4
feat: add AI suggestion card component with status states and conflic…
TerrifiedBug Mar 10, 2026
c00253b
feat: add useAiConversation hook for conversation management and stre…
TerrifiedBug Mar 10, 2026
c3e9e1a
feat: add AI message bubble with suggestion cards, selection, and bat…
TerrifiedBug Mar 10, 2026
aef7de4
feat: rewrite AI review dialog with conversation thread and actionabl…
TerrifiedBug Mar 10, 2026
6768de4
docs: update AI suggestions guide with actionable review cards
TerrifiedBug Mar 10, 2026
a151e7a
fix: resolve Prisma Json type errors and lint issues in AI suggestion…
TerrifiedBug Mar 10, 2026
c83d37c
fix: resolve lint warnings in AI conversation components
TerrifiedBug Mar 10, 2026
227d529
fix: hash bearer tokens in WS auth cache to prevent credential exposure
TerrifiedBug Mar 11, 2026
d4e75ae
fix: reset poll ticker when receiving poll_interval from server
TerrifiedBug Mar 11, 2026
2631f82
fix: resolve race condition in agent samples error path
TerrifiedBug Mar 11, 2026
3ba3b10
fix: close authorization gaps in AI conversation endpoints
TerrifiedBug Mar 11, 2026
bd86fb4
fix: prevent ghost undo entry and use withAudit middleware for sugges…
TerrifiedBug Mar 11, 2026
8e6cabd
fix: resolve 3 AI conversation state sync bugs
TerrifiedBug Mar 11, 2026
6a57f49
Merge remote-tracking branch 'origin/main' into ai-agent-improvements…
TerrifiedBug Mar 11, 2026
26ec9b9
fix: new conversation refetch loop and dot-notation config paths
TerrifiedBug Mar 11, 2026
45d85c2
fix: wrap markSuggestionsApplied in transaction to prevent lost updates
TerrifiedBug Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (a *Agent) Run() error {
}()

// Main loop: poll + heartbeat
ticker := time.NewTicker(a.cfg.PollInterval)
currentInterval := a.cfg.PollInterval
ticker := time.NewTicker(currentInterval)
defer ticker.Stop()

// Do first poll immediately
Expand All @@ -107,6 +108,7 @@ func (a *Agent) Run() error {
}

a.sendHeartbeat()
currentInterval = a.maybeResetTicker(ticker, currentInterval)

for {
select {
Expand All @@ -124,6 +126,7 @@ func (a *Agent) Run() error {
case <-ticker.C:
a.pollAndApply()
a.sendHeartbeat()
currentInterval = a.maybeResetTicker(ticker, currentInterval)
case msg := <-a.wsCh:
a.handleWsMessage(msg, ticker)
case <-a.immediateHeartbeatCh:
Expand All @@ -132,6 +135,22 @@ func (a *Agent) Run() error {
}
}

// maybeResetTicker checks if the server provided a new poll interval and resets
// the ticker if it changed. Returns the (possibly updated) current interval.
func (a *Agent) maybeResetTicker(ticker *time.Ticker, current time.Duration) time.Duration {
serverMs := a.poller.PollIntervalMs()
if serverMs <= 0 {
return current
}
serverInterval := time.Duration(serverMs) * time.Millisecond
if serverInterval != current {
slog.Info("poll interval updated by server", "old", current, "new", serverInterval)
ticker.Reset(serverInterval)
return serverInterval
}
return current
}

func (a *Agent) pollAndApply() {
actions, err := a.poller.Poll()
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions agent/internal/agent/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type poller struct {
known map[string]pipelineState // pipelineId -> last known state
sampleRequests []client.SampleRequestMsg
pendingAction *client.PendingAction
pollIntervalMs int // server-provided poll interval from last response
websocketUrl string
}

Expand Down Expand Up @@ -190,6 +191,9 @@ func (p *poller) Poll() ([]PipelineAction, error) {
// Store pending action (e.g. self-update) for the agent to handle
p.pendingAction = resp.PendingAction

// Store server-provided poll interval
p.pollIntervalMs = resp.PollIntervalMs

// Store websocket URL for the agent to use
if resp.WebSocketURL != "" {
p.websocketUrl = resp.WebSocketURL
Expand All @@ -212,6 +216,11 @@ func (p *poller) PendingAction() *client.PendingAction {
return p.pendingAction
}

// PollIntervalMs returns the server-provided poll interval from the last response.
func (p *poller) PollIntervalMs() int {
return p.pollIntervalMs
}

// WebSocketURL returns the WebSocket URL from the last config response.
func (p *poller) WebSocketURL() string {
p.mu.Lock()
Expand Down
39 changes: 38 additions & 1 deletion docs/public/user-guide/ai-suggestions.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,44 @@ Ask the AI to analyze your current pipeline configuration:

> "Are there any performance issues with my pipeline?"

The AI reviews the generated YAML and provides suggestions for improvements, best practices, and potential issues.
The AI returns structured, actionable suggestion cards that you can selectively apply to your canvas.

#### Suggestion cards

Each suggestion appears as an interactive card showing:

- **Title** and **description** explaining why the change helps
- **Priority badge** (High, Medium, Low)
- **Type badge** — Config Change, Add Component, Remove Component, or Rewire
- **Checkbox** for batch selection
- **Config preview** for configuration changes showing the exact fields that will be modified

#### Applying suggestions

- **Apply All** — applies every actionable suggestion from that AI response
- **Apply Selected** — applies only the suggestions you have checked

Applied suggestions are marked with a green "Applied" badge and cannot be re-applied. The entire batch is a single undo operation — press **Ctrl+Z** (or **Cmd+Z**) to revert all changes at once.

#### Conflict detection

When you select multiple suggestions that conflict (e.g., two suggestions modifying the same config field, or one removing a component that another references), an amber warning appears on the affected cards explaining the conflict. You can still apply conflicting suggestions, but review the warnings first.

#### Suggestion statuses

| Status | Meaning |
|--------|---------|
| **Actionable** | Ready to apply |
| **Applied** | Already applied to the canvas |
| **Outdated** | The pipeline changed since this suggestion was made |
| **Invalid** | References a component that no longer exists on the canvas |

#### Conversations

Review conversations are persistent — they are saved per pipeline and visible to all team members with access. You can:

- **Ask follow-up questions** using the input at the bottom of the dialog
- **Start a new conversation** by clicking "New Conversation" below the input

## Rate Limits

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- CreateTable
CREATE TABLE "AiConversation" (
"id" TEXT NOT NULL,
"pipelineId" TEXT NOT NULL,
"createdById" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,

CONSTRAINT "AiConversation_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "AiMessage" (
"id" TEXT NOT NULL,
"conversationId" TEXT NOT NULL,
"role" TEXT NOT NULL,
"content" TEXT NOT NULL,
"suggestions" JSONB,
"pipelineYaml" TEXT,
"createdById" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "AiMessage_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE INDEX "AiConversation_pipelineId_createdAt_idx" ON "AiConversation"("pipelineId", "createdAt");

-- CreateIndex
CREATE INDEX "AiMessage_conversationId_createdAt_idx" ON "AiMessage"("conversationId", "createdAt");

-- AddForeignKey
ALTER TABLE "AiConversation" ADD CONSTRAINT "AiConversation_pipelineId_fkey" FOREIGN KEY ("pipelineId") REFERENCES "Pipeline"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "AiConversation" ADD CONSTRAINT "AiConversation_createdById_fkey" FOREIGN KEY ("createdById") REFERENCES "User"("id") ON DELETE SET NULL ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "AiMessage" ADD CONSTRAINT "AiMessage_conversationId_fkey" FOREIGN KEY ("conversationId") REFERENCES "AiConversation"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "AiMessage" ADD CONSTRAINT "AiMessage_createdById_fkey" FOREIGN KEY ("createdById") REFERENCES "User"("id") ON DELETE SET NULL ON UPDATE CASCADE;
31 changes: 31 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ model User {
deployRequestsReviewed DeployRequest[] @relation("deployReviewer")
deployRequestsExecuted DeployRequest[] @relation("deployExecutor")
preferences UserPreference[]
aiConversationsCreated AiConversation[] @relation("AiConversationCreatedBy")
aiMessagesCreated AiMessage[] @relation("AiMessageCreatedBy")
createdAt DateTime @default(now())
}

Expand Down Expand Up @@ -277,6 +279,7 @@ model Pipeline {
slis PipelineSli[]
enrichMetadata Boolean @default(false)
tags Json? @default("[]") // string[] of classification tags like ["PII", "PCI-DSS"]
aiConversations AiConversation[]
deployRequests DeployRequest[]
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
Expand Down Expand Up @@ -753,3 +756,31 @@ model ServiceAccount {
@@index([hashedKey])
@@index([environmentId])
}

model AiConversation {
id String @id @default(cuid())
pipelineId String
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
createdById String?
createdBy User? @relation("AiConversationCreatedBy", fields: [createdById], references: [id], onDelete: SetNull)
messages AiMessage[]
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

@@index([pipelineId, createdAt])
}

model AiMessage {
id String @id @default(cuid())
conversationId String
conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade)
role String // "user" | "assistant"
content String
suggestions Json?
pipelineYaml String?
createdById String?
createdBy User? @relation("AiMessageCreatedBy", fields: [createdById], references: [id], onDelete: SetNull)
createdAt DateTime @default(now())

@@index([conversationId, createdAt])
}
1 change: 1 addition & 0 deletions src/app/(dashboard)/pipelines/[id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
<AiPipelineDialog
open={aiDialogOpen}
onOpenChange={setAiDialogOpen}
pipelineId={pipelineId}
environmentName={pipelineQuery.data?.environment?.name}
/>
)}
Expand Down
57 changes: 39 additions & 18 deletions src/app/api/agent/heartbeat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,25 +362,46 @@ export async function POST(request: Request) {
});
if (!request || request.status !== "PENDING") continue;

await prisma.eventSample.create({
data: {
requestId: result.requestId,
pipelineId: request.pipelineId,
componentKey: result.componentKey ?? "",
events: (result.events ?? []) as Prisma.InputJsonValue,
schema: (result.schema ?? []) as Prisma.InputJsonValue,
error: result.error ?? null,
},
});
try {
await prisma.eventSample.create({
data: {
requestId: result.requestId,
pipelineId: request.pipelineId,
componentKey: result.componentKey ?? "",
events: (result.events ?? []) as Prisma.InputJsonValue,
schema: (result.schema ?? []) as Prisma.InputJsonValue,
error: result.error ?? null,
},
});

await prisma.eventSampleRequest.update({
where: { id: result.requestId },
data: {
status: result.error ? "ERROR" : "COMPLETED",
completedAt: new Date(),
nodeId: agent.nodeId,
},
});
await prisma.eventSampleRequest.update({
where: { id: result.requestId },
data: {
status: result.error ? "ERROR" : "COMPLETED",
completedAt: new Date(),
nodeId: agent.nodeId,
},
});
} catch (err) {
// Only mark as ERROR if the EventSample write itself failed.
// If another agent already submitted a successful result, the
// request may already be COMPLETED — avoid overwriting that.
const current = await prisma.eventSampleRequest.findUnique({
where: { id: result.requestId },
select: { status: true },
});
if (current && current.status === "PENDING") {
await prisma.eventSampleRequest.update({
where: { id: result.requestId },
data: {
status: "ERROR",
completedAt: new Date(),
nodeId: agent.nodeId,
},
});
}
console.error("EventSample write error:", err);
}
}
}

Expand Down
Loading
Loading