Skip to content

Commit 069f8bd

Browse files
committed
feat(copilot): add copilot_chat_messages table with dual-write rollout
Introduces a dedicated `copilot_chat_messages` table to replace the JSONB messages array on `copilot_chats`. The JSONB column is preserved as the source of truth during rollout, with dual-writes capturing every new message in both places. Schema (additive only): - New `copilot_chat_messages` table with FK to `copilot_chats` (ON DELETE CASCADE), unique (chat_id, message_id), partial indexes for hot reads - Migration runs an inline `INSERT ... SELECT jsonb_array_elements ...` backfill, idempotent via ON CONFLICT — self-hosters need no scripts Dual-write helper: - `lib/copilot/chat/messages-dual-write.ts` — best-effort append + replace helpers, errors logged but not thrown so JSONB write remains canonical Dual-write callsites (every place that mutates copilot_chats.messages): - `lib/copilot/chat/post.ts` — user message append - `lib/copilot/chat/terminal-state.ts` — assistant message append (writes after the FOR UPDATE transaction commits) - `lib/mothership/inbox/executor.ts` — inbox-derived message persistence - `app/api/mothership/chats/[chatId]/fork/route.ts` — forked transcript - `app/api/copilot/chat/update-messages/route.ts` — snapshot replace Catch-up sweep: - `lib/copilot/chat/messages-catchup.ts` — bounded 7-day sweep on server boot to close the rolling-deploy window where old code may write to JSONB before the new dual-write code is live everywhere
1 parent 1631b36 commit 069f8bd

12 files changed

Lines changed: 17151 additions & 0 deletions

File tree

apps/sim/app/api/copilot/chat/update-messages/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { type NextRequest, NextResponse } from 'next/server'
66
import { updateCopilotMessagesContract } from '@/lib/api/contracts/copilot'
77
import { parseRequest } from '@/lib/api/server'
88
import { getAccessibleCopilotChatAuth } from '@/lib/copilot/chat/lifecycle'
9+
import { replaceCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
910
import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message'
1011
import {
1112
authenticateCopilotRequestSessionOnly,
@@ -87,6 +88,7 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
8788
}
8889

8990
await db.update(copilotChats).set(updateData).where(eq(copilotChats.id, chatId))
91+
await replaceCopilotChatMessages(chatId, normalizedMessages)
9092

9193
logger.info(`[${tracker.requestId}] Successfully updated chat`, {
9294
chatId,

apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { eq } from 'drizzle-orm'
66
import { type NextRequest, NextResponse } from 'next/server'
77
import { forkMothershipChatContract } from '@/lib/api/contracts/mothership-tasks'
88
import { parseRequest } from '@/lib/api/server'
9+
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
910
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
1011
import { fetchGo } from '@/lib/copilot/request/go/fetch'
1112
import {
@@ -102,6 +103,8 @@ export const POST = withRouteHandler(
102103
return createInternalServerErrorResponse('Failed to create forked chat')
103104
}
104105

106+
await appendCopilotChatMessages(newId, forkedMessages, { chatModel: parent.model })
107+
105108
// Clone copilot-service conversation state (messages, active_messages, memory files).
106109
// Best-effort: if the copilot service doesn't have a row for the source chat yet, skip.
107110
try {

apps/sim/instrumentation-node.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,4 +312,12 @@ export async function register() {
312312

313313
const { startMemoryTelemetry } = await import('./lib/monitoring/memory-telemetry')
314314
startMemoryTelemetry()
315+
316+
// Fire-and-forget catch-up sweep: bounded to the last 7 days, idempotent,
317+
// runs in the background so server boot isn't blocked.
318+
void import('./lib/copilot/chat/messages-catchup')
319+
.then((mod) => mod.catchUpCopilotChatMessages())
320+
.catch((err) => {
321+
logger.warn('Failed to schedule copilot chat messages catch-up', err)
322+
})
315323
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { db } from '@sim/db'
2+
import { createLogger } from '@sim/logger'
3+
import { getErrorMessage } from '@sim/utils/errors'
4+
import { sql } from 'drizzle-orm'
5+
6+
const logger = createLogger('CopilotChatMessagesCatchup')
7+
8+
/**
9+
* Sweep recently-active chats from `copilot_chats.messages` JSONB into the new
10+
* `copilot_chat_messages` table. Idempotent via `ON CONFLICT DO NOTHING`.
11+
*
12+
* Bounded to the last 7 days of activity so the cost is bounded regardless of
13+
* total table size. The migration handles initial backfill; this sweep only
14+
* exists to close the rolling-deploy window where old code may write to JSONB
15+
* before the new dual-write code is live on every server.
16+
*/
17+
export async function catchUpCopilotChatMessages(): Promise<void> {
18+
try {
19+
const result = await db.execute(sql`
20+
INSERT INTO copilot_chat_messages (chat_id, message_id, role, content, model, created_at, updated_at)
21+
SELECT
22+
c.id,
23+
COALESCE(msg.value->>'id', gen_random_uuid()::text),
24+
COALESCE(msg.value->>'role', 'user'),
25+
msg.value,
26+
COALESCE(msg.value->>'model', c.model),
27+
COALESCE(
28+
NULLIF(msg.value->>'createdAt','')::timestamp,
29+
c.created_at + (msg.ord * interval '1 microsecond')
30+
),
31+
COALESCE(
32+
NULLIF(msg.value->>'createdAt','')::timestamp,
33+
c.created_at + (msg.ord * interval '1 microsecond')
34+
)
35+
FROM copilot_chats c
36+
CROSS JOIN LATERAL jsonb_array_elements(c.messages) WITH ORDINALITY AS msg(value, ord)
37+
WHERE c.updated_at > now() - interval '7 days'
38+
AND jsonb_typeof(c.messages) = 'array'
39+
AND jsonb_array_length(c.messages) > 0
40+
ON CONFLICT (chat_id, message_id) DO NOTHING
41+
`)
42+
logger.info('Copilot chat messages catch-up completed', {
43+
rowCount: (result as { rowCount?: number }).rowCount ?? 0,
44+
})
45+
} catch (err) {
46+
logger.warn('Copilot chat messages catch-up failed', { error: getErrorMessage(err) })
47+
}
48+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { db } from '@sim/db'
2+
import { copilotChatMessages } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { getErrorMessage } from '@sim/utils/errors'
5+
import { generateShortId } from '@sim/utils/id'
6+
import { eq, sql } from 'drizzle-orm'
7+
8+
const logger = createLogger('CopilotChatMessagesDualWrite')
9+
10+
/**
11+
* Build a row payload for `copilot_chat_messages` from a JSONB message blob.
12+
* The blob format mirrors what's stored in the legacy `copilot_chats.messages`
13+
* array — `id`, `role`, and optionally `model`/`createdAt` at the top level —
14+
* with the entire blob preserved as the row's `content` for forward compat.
15+
*/
16+
function toMessageRow(
17+
chatId: string,
18+
rawMessage: unknown,
19+
options?: { chatModel?: string | null; streamId?: string | null }
20+
): typeof copilotChatMessages.$inferInsert | null {
21+
if (!rawMessage || typeof rawMessage !== 'object') return null
22+
const msg = rawMessage as Record<string, unknown>
23+
const id = typeof msg.id === 'string' && msg.id.length > 0 ? msg.id : generateShortId()
24+
const role = typeof msg.role === 'string' ? msg.role : 'user'
25+
const model =
26+
typeof msg.model === 'string' && msg.model.length > 0 ? msg.model : (options?.chatModel ?? null)
27+
return {
28+
chatId,
29+
messageId: id,
30+
role,
31+
content: msg,
32+
model,
33+
streamId: options?.streamId ?? null,
34+
}
35+
}
36+
37+
/**
38+
* Append messages to the new `copilot_chat_messages` table. Best-effort —
39+
* errors are logged but never thrown, since the legacy `copilot_chats.messages`
40+
* JSONB column remains the source of truth during the dual-write rollout.
41+
*/
42+
export async function appendCopilotChatMessages(
43+
chatId: string,
44+
messages: unknown[],
45+
options?: { chatModel?: string | null; streamId?: string | null }
46+
): Promise<void> {
47+
if (!Array.isArray(messages) || messages.length === 0) return
48+
49+
const rows = messages
50+
.map((msg) => toMessageRow(chatId, msg, options))
51+
.filter((row): row is typeof copilotChatMessages.$inferInsert => row !== null)
52+
53+
if (rows.length === 0) return
54+
55+
try {
56+
await db
57+
.insert(copilotChatMessages)
58+
.values(rows)
59+
.onConflictDoUpdate({
60+
target: [copilotChatMessages.chatId, copilotChatMessages.messageId],
61+
set: {
62+
content: sql`excluded.content`,
63+
role: sql`excluded.role`,
64+
model: sql`excluded.model`,
65+
updatedAt: sql`now()`,
66+
},
67+
})
68+
} catch (err) {
69+
logger.warn('Failed to append copilot chat messages', {
70+
chatId,
71+
messageCount: rows.length,
72+
error: getErrorMessage(err),
73+
})
74+
}
75+
}
76+
77+
/**
78+
* Replace all messages for a chat. Used by the update-messages endpoint that
79+
* receives a full snapshot of the conversation state. Best-effort.
80+
*/
81+
export async function replaceCopilotChatMessages(
82+
chatId: string,
83+
messages: unknown[],
84+
options?: { chatModel?: string | null }
85+
): Promise<void> {
86+
if (!Array.isArray(messages)) return
87+
88+
const rows = messages
89+
.map((msg) => toMessageRow(chatId, msg, options))
90+
.filter((row): row is typeof copilotChatMessages.$inferInsert => row !== null)
91+
92+
try {
93+
await db.transaction(async (tx) => {
94+
await tx.delete(copilotChatMessages).where(eq(copilotChatMessages.chatId, chatId))
95+
if (rows.length > 0) {
96+
await tx.insert(copilotChatMessages).values(rows)
97+
}
98+
})
99+
} catch (err) {
100+
logger.warn('Failed to replace copilot chat messages', {
101+
chatId,
102+
messageCount: rows.length,
103+
error: getErrorMessage(err),
104+
})
105+
}
106+
}

apps/sim/lib/copilot/chat/post.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { z } from 'zod'
1010
import { isZodError, validationErrorResponse } from '@/lib/api/server'
1111
import { getSession } from '@/lib/auth'
1212
import { type ChatLoadResult, resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle'
13+
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
1314
import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload'
1415
import {
1516
buildPersistedAssistantMessage,
@@ -332,6 +333,10 @@ async function persistUserMessage(params: {
332333
.where(eq(copilotChats.id, chatId))
333334
.returning({ messages: copilotChats.messages })
334335

336+
if (updated) {
337+
await appendCopilotChatMessages(chatId, [userMsg], { streamId: userMessageId })
338+
}
339+
335340
const messagesAfter = Array.isArray(updated?.messages) ? updated.messages : undefined
336341
span.setAttributes({
337342
[TraceAttr.ChatPersistOutcome]: updated

apps/sim/lib/copilot/chat/terminal-state.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { db } from '@sim/db'
22
import { copilotChats } from '@sim/db/schema'
33
import { and, eq, sql } from 'drizzle-orm'
4+
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
45
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
56
import { CopilotChatFinalizeOutcome } from '@/lib/copilot/generated/trace-attribute-values-v1'
67
import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1'
@@ -47,6 +48,7 @@ export async function finalizeAssistantTurn({
4748
[TraceAttr.ChatHasAssistantMessage]: !!assistantMessage,
4849
},
4950
async (span) => {
51+
let appendedAssistantMessage: PersistedMessage | undefined
5052
const result = await db.transaction(async (tx) => {
5153
const where = userId
5254
? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId))
@@ -113,6 +115,7 @@ export async function finalizeAssistantTurn({
113115
messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`,
114116
})
115117
.where(updateWhere)
118+
appendedAssistantMessage = assistantMessage
116119
return {
117120
found: true,
118121
updated: true,
@@ -148,6 +151,12 @@ export async function finalizeAssistantTurn({
148151
}
149152
})
150153

154+
if (appendedAssistantMessage) {
155+
await appendCopilotChatMessages(chatId, [appendedAssistantMessage], {
156+
streamId: userMessageId,
157+
})
158+
}
159+
151160
span.setAttribute(TraceAttr.ChatFinalizeOutcome, result.outcome)
152161
return result
153162
}

apps/sim/lib/mothership/inbox/executor.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { getErrorMessage } from '@sim/utils/errors'
44
import { generateId } from '@sim/utils/id'
55
import { and, eq, sql } from 'drizzle-orm'
66
import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle'
7+
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
78
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload'
89
import {
910
buildPersistedAssistantMessage,
@@ -350,6 +351,7 @@ async function persistChatMessages(
350351
updatedAt: new Date(),
351352
})
352353
.where(eq(copilotChats.id, chatId))
354+
await appendCopilotChatMessages(chatId, [userMessage, assistantMessage])
353355
} catch (err) {
354356
logger.warn('Failed to persist chat messages', {
355357
chatId,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
CREATE TABLE "copilot_chat_messages" (
2+
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
3+
"chat_id" uuid NOT NULL,
4+
"message_id" text NOT NULL,
5+
"role" text NOT NULL,
6+
"content" jsonb NOT NULL,
7+
"stream_id" text,
8+
"parent_message_id" text,
9+
"model" text,
10+
"tokens_in" integer,
11+
"tokens_out" integer,
12+
"deleted_at" timestamp,
13+
"created_at" timestamp DEFAULT now() NOT NULL,
14+
"updated_at" timestamp DEFAULT now() NOT NULL
15+
);
16+
--> statement-breakpoint
17+
ALTER TABLE "copilot_chat_messages" ADD CONSTRAINT "copilot_chat_messages_chat_id_copilot_chats_id_fk" FOREIGN KEY ("chat_id") REFERENCES "public"."copilot_chats"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
18+
CREATE UNIQUE INDEX "copilot_chat_messages_chat_message_unique" ON "copilot_chat_messages" USING btree ("chat_id","message_id");--> statement-breakpoint
19+
CREATE INDEX "copilot_chat_messages_chat_created_at_idx" ON "copilot_chat_messages" USING btree ("chat_id","created_at","id") WHERE "copilot_chat_messages"."deleted_at" IS NULL;--> statement-breakpoint
20+
CREATE INDEX "copilot_chat_messages_chat_stream_idx" ON "copilot_chat_messages" USING btree ("chat_id","stream_id") WHERE "copilot_chat_messages"."stream_id" IS NOT NULL;--> statement-breakpoint
21+
INSERT INTO "copilot_chat_messages" (
22+
"chat_id", "message_id", "role", "content", "model", "created_at", "updated_at"
23+
)
24+
SELECT
25+
c."id",
26+
COALESCE(msg.value->>'id', gen_random_uuid()::text),
27+
COALESCE(msg.value->>'role', 'user'),
28+
msg.value,
29+
COALESCE(msg.value->>'model', c."model"),
30+
COALESCE(
31+
NULLIF(msg.value->>'createdAt','')::timestamp,
32+
c."created_at" + (msg.ord * interval '1 microsecond')
33+
),
34+
COALESCE(
35+
NULLIF(msg.value->>'createdAt','')::timestamp,
36+
c."created_at" + (msg.ord * interval '1 microsecond')
37+
)
38+
FROM "copilot_chats" c
39+
CROSS JOIN LATERAL jsonb_array_elements(c."messages") WITH ORDINALITY AS msg(value, ord)
40+
WHERE jsonb_typeof(c."messages") = 'array'
41+
AND jsonb_array_length(c."messages") > 0
42+
ON CONFLICT ("chat_id", "message_id") DO NOTHING;

0 commit comments

Comments
 (0)