Skip to content

Commit fd77bb4

Browse files
waleedlatif1claude
andauthored
feat(copilot): add seq ordinal to copilot_messages for order-preserving reads (#4791)
copilot_messages had no column preserving message order: created_at (set from each message's timestamp) ties at millisecond granularity in 58% of chats, and some chats have out-of-order timestamps within their array. The only other tiebreaker, id, is a random UUID — so ORDER BY created_at, id renders same-timestamp user/assistant pairs swapped. This blocks the R+1 read cutover. Add an integer seq = the message's 0-based index within the chat's JSONB array (ground-truth order), backfilled inline in migration 0219 (no script for self-hosters or us). Reads will use ORDER BY seq NULLS LAST, created_at, id at cutover; reads still come from JSONB after this PR. Design: - seq is a tiebreaker, not the sole sort key (concurrent-append/NULL safety). - Nullable now; defer NOT NULL so rolling-deploy old pods don't fail inserts. - replace (update-messages snapshot) overwrites seq = array index (re-densifies after a mid-conversation delete); append preserves existing seq via COALESCE and assigns base+idx from a single MAX(seq) read (never MAX+i in SQL — multi-row batches would collide). The non-atomic read-then-insert window is documented and bounded by the read tiebreak + snapshot re-densify. - Dedupe message ids before insert (87 prod chats carry dup ids; a repeated id in one INSERT...ON CONFLICT would otherwise throw). - Backfill picks first-occurrence per (chat,id), gap-free via ROW_NUMBER; validated on staging data (0-based, contiguous, 0 bad ranges). Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent dc6073e commit fd77bb4

7 files changed

Lines changed: 17650 additions & 23 deletions

File tree

apps/sim/lib/copilot/chat/messages-dual-write.test.ts

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ const assistantMsg: PersistedMessage = {
2626
timestamp: '2026-01-01T00:00:01.000Z',
2727
}
2828

29+
/** The first arg passed to the most recent `.values(...)` call. */
30+
function lastValuesRows() {
31+
const calls = dbChainMockFns.values.mock.calls
32+
return calls[calls.length - 1][0] as Array<Record<string, unknown>>
33+
}
34+
2935
describe('messages-dual-write', () => {
3036
beforeEach(() => {
3137
vi.clearAllMocks()
@@ -43,7 +49,7 @@ describe('messages-dual-write', () => {
4349

4450
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
4551
expect(dbChainMockFns.values).toHaveBeenCalledTimes(1)
46-
const rows = dbChainMockFns.values.mock.calls[0][0]
52+
const rows = lastValuesRows()
4753
expect(rows).toHaveLength(2)
4854

4955
expect(rows[0]).toMatchObject({
@@ -54,22 +60,34 @@ describe('messages-dual-write', () => {
5460
model: null,
5561
streamId: null,
5662
})
57-
expect(rows[0].createdAt).toEqual(new Date(userMsg.timestamp))
58-
expect(rows[0].updatedAt).toEqual(new Date(userMsg.timestamp))
63+
expect(rows[0].createdAt as Date).toEqual(new Date(userMsg.timestamp))
64+
expect(rows[0].updatedAt as Date).toEqual(new Date(userMsg.timestamp))
5965

6066
expect(rows[1]).toMatchObject({
6167
chatId: 'chat-1',
6268
messageId: 'msg-asst-1',
6369
role: 'assistant',
6470
content: assistantMsg,
6571
})
66-
expect(rows[1].createdAt).toEqual(new Date(assistantMsg.timestamp))
72+
expect(rows[1].createdAt as Date).toEqual(new Date(assistantMsg.timestamp))
6773
})
6874

69-
it('preserves per-message ordering via timestamp', async () => {
75+
it('assigns seq as 0-based array index when the chat has no prior rows', async () => {
76+
dbChainMockFns.where.mockResolvedValueOnce([{ maxSeq: null }])
77+
7078
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
71-
const rows = dbChainMockFns.values.mock.calls[0][0]
72-
expect(rows[0].createdAt.getTime()).toBeLessThan(rows[1].createdAt.getTime())
79+
const rows = lastValuesRows()
80+
expect(rows[0].seq).toBe(0)
81+
expect(rows[1].seq).toBe(1)
82+
})
83+
84+
it('continues seq from MAX(seq)+1 when the chat already has rows', async () => {
85+
dbChainMockFns.where.mockResolvedValueOnce([{ maxSeq: 4 }])
86+
87+
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
88+
const rows = lastValuesRows()
89+
expect(rows[0].seq).toBe(5)
90+
expect(rows[1].seq).toBe(6)
7391
})
7492

7593
it('passes chatModel and streamId options to every row', async () => {
@@ -78,14 +96,14 @@ describe('messages-dual-write', () => {
7896
streamId: 'stream-xyz',
7997
})
8098

81-
const rows = dbChainMockFns.values.mock.calls[0][0]
99+
const rows = lastValuesRows()
82100
expect(rows[0].model).toBe('claude-sonnet-4-5')
83101
expect(rows[0].streamId).toBe('stream-xyz')
84102
expect(rows[1].model).toBe('claude-sonnet-4-5')
85103
expect(rows[1].streamId).toBe('stream-xyz')
86104
})
87105

88-
it('uses ON CONFLICT DO UPDATE with chat_id + message_id target', async () => {
106+
it('uses ON CONFLICT DO UPDATE that PRESERVES existing seq', async () => {
89107
await appendCopilotChatMessages('chat-1', [userMsg])
90108

91109
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
@@ -96,6 +114,14 @@ describe('messages-dual-write', () => {
96114
expect(conflictArg.set).toHaveProperty('model')
97115
expect(conflictArg.set).toHaveProperty('streamId')
98116
expect(conflictArg.set).toHaveProperty('updatedAt')
117+
expect(conflictArg.set.seq.strings.join('')).toContain('COALESCE(')
118+
})
119+
120+
it('collapses duplicate message ids to a single row', async () => {
121+
await appendCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
122+
const rows = lastValuesRows()
123+
expect(rows).toHaveLength(1)
124+
expect(rows[0].messageId).toBe('msg-user-1')
99125
})
100126

101127
it('swallows DB errors so the legacy JSONB write stays canonical', async () => {
@@ -120,25 +146,42 @@ describe('messages-dual-write', () => {
120146
expect(dbChainMockFns.delete).toHaveBeenCalledTimes(1)
121147
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
122148

123-
const rows = dbChainMockFns.values.mock.calls[0][0]
149+
const rows = lastValuesRows()
124150
expect(rows).toHaveLength(2)
125-
expect(rows.map((r: { messageId: string }) => r.messageId)).toEqual([
126-
'msg-user-1',
127-
'msg-asst-1',
128-
])
151+
expect(rows.map((r) => r.messageId)).toEqual(['msg-user-1', 'msg-asst-1'])
129152

130153
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
131154
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
132155
expect(conflictArg.set).toHaveProperty('streamId')
133156
expect(conflictArg.set).toHaveProperty('model')
134157
})
135158

159+
it('assigns seq as the snapshot array index (0-based)', async () => {
160+
await replaceCopilotChatMessages('chat-1', [userMsg, assistantMsg])
161+
const rows = lastValuesRows()
162+
expect(rows[0].seq).toBe(0)
163+
expect(rows[1].seq).toBe(1)
164+
})
165+
166+
it('OVERWRITES seq on conflict so positions re-densify after a delete', async () => {
167+
await replaceCopilotChatMessages('chat-1', [userMsg])
168+
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
169+
expect(conflictArg.set.seq.strings.join('')).toBe('excluded.seq')
170+
})
171+
172+
it('collapses duplicate message ids to a single row', async () => {
173+
await replaceCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
174+
const rows = lastValuesRows()
175+
expect(rows).toHaveLength(1)
176+
expect(rows[0].seq).toBe(0)
177+
})
178+
136179
it('passes chatModel to every row in the snapshot', async () => {
137180
await replaceCopilotChatMessages('chat-1', [userMsg], {
138181
chatModel: 'gpt-4o-mini',
139182
})
140183

141-
const rows = dbChainMockFns.values.mock.calls[0][0]
184+
const rows = lastValuesRows()
142185
expect(rows[0].model).toBe('gpt-4o-mini')
143186
})
144187

apps/sim/lib/copilot/chat/messages-dual-write.ts

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,26 @@ import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
77

88
const logger = createLogger('CopilotMessagesDualWrite')
99

10+
/**
11+
* Keep the first occurrence of each message id. A single `INSERT ... ON
12+
* CONFLICT` cannot touch the same conflict target twice, so a repeated id
13+
* would otherwise throw.
14+
*/
15+
function dedupeById(messages: PersistedMessage[]): PersistedMessage[] {
16+
const seen = new Set<string>()
17+
const out: PersistedMessage[] = []
18+
for (const m of messages) {
19+
if (seen.has(m.id)) continue
20+
seen.add(m.id)
21+
out.push(m)
22+
}
23+
return out
24+
}
25+
1026
function toRow(
1127
chatId: string,
1228
message: PersistedMessage,
29+
seq: number,
1330
options?: { chatModel?: string | null; streamId?: string | null }
1431
): typeof copilotMessages.$inferInsert {
1532
const ts = new Date(message.timestamp)
@@ -18,6 +35,7 @@ function toRow(
1835
messageId: message.id,
1936
role: message.role,
2037
content: message,
38+
seq,
2139
model: options?.chatModel ?? null,
2240
streamId: options?.streamId ?? null,
2341
createdAt: ts,
@@ -27,8 +45,15 @@ function toRow(
2745

2846
/**
2947
* Append messages to the new `copilot_messages` table. Best-effort — errors
30-
* are logged but never thrown, since the legacy `copilot_chats.messages`
31-
* JSONB column remains the source of truth during the dual-write rollout.
48+
* are logged but never thrown; the legacy `copilot_chats.messages` JSONB
49+
* column stays the source of truth during the dual-write rollout.
50+
*
51+
* `seq` is `MAX(seq) + index`, computed in JS (not in SQL, where every row of
52+
* a multi-row INSERT would read the same pre-insert MAX and collide). The
53+
* read-then-insert is non-atomic, so interleaved appends to one chat can tie
54+
* `seq`; that window is bounded by the cutover read order (`seq, created_at,
55+
* id`) and `replaceCopilotChatMessages`, which re-densifies `seq` from the
56+
* authoritative JSONB order on the next snapshot save.
3257
*/
3358
export async function appendCopilotChatMessages(
3459
chatId: string,
@@ -37,16 +62,23 @@ export async function appendCopilotChatMessages(
3762
): Promise<void> {
3863
if (messages.length === 0) return
3964
try {
65+
const deduped = dedupeById(messages)
66+
const [maxRow] = await db
67+
.select({ maxSeq: sql<number | null>`max(${copilotMessages.seq})` })
68+
.from(copilotMessages)
69+
.where(eq(copilotMessages.chatId, chatId))
70+
const base = (maxRow?.maxSeq ?? -1) + 1
4071
await db
4172
.insert(copilotMessages)
42-
.values(messages.map((m) => toRow(chatId, m, options)))
73+
.values(deduped.map((m, i) => toRow(chatId, m, base + i, options)))
4374
.onConflictDoUpdate({
4475
target: [copilotMessages.chatId, copilotMessages.messageId],
4576
set: {
4677
content: sql`excluded.content`,
4778
role: sql`excluded.role`,
4879
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
4980
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
81+
seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`,
5082
updatedAt: sql`now()`,
5183
},
5284
})
@@ -69,7 +101,8 @@ export async function replaceCopilotChatMessages(
69101
options?: { chatModel?: string | null }
70102
): Promise<void> {
71103
try {
72-
const newMessageIds = messages.map((m) => m.id)
104+
const deduped = dedupeById(messages)
105+
const newMessageIds = deduped.map((m) => m.id)
73106
await db.transaction(async (tx) => {
74107
// Drop rows for messages not in the new snapshot.
75108
await tx
@@ -82,19 +115,20 @@ export async function replaceCopilotChatMessages(
82115
)
83116
: eq(copilotMessages.chatId, chatId)
84117
)
85-
if (messages.length === 0) return
86-
// Upsert remaining rows. ON CONFLICT preserves existing stream_id / model
87-
// so a snapshot save doesn't clobber metadata set during streaming.
118+
if (deduped.length === 0) return
119+
// Snapshot is authoritative on order, so seq = array index is overwritten
120+
// on conflict; stream_id / model are preserved via COALESCE.
88121
await tx
89122
.insert(copilotMessages)
90-
.values(messages.map((m) => toRow(chatId, m, options)))
123+
.values(deduped.map((m, i) => toRow(chatId, m, i, options)))
91124
.onConflictDoUpdate({
92125
target: [copilotMessages.chatId, copilotMessages.messageId],
93126
set: {
94127
content: sql`excluded.content`,
95128
role: sql`excluded.role`,
96129
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
97130
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
131+
seq: sql`excluded.seq`,
98132
updatedAt: sql`now()`,
99133
},
100134
})
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
ALTER TABLE "copilot_messages" ADD COLUMN "seq" integer;--> statement-breakpoint
2+
WITH ordered AS (
3+
SELECT c."id" AS chat_id, elem.value->>'id' AS message_id, elem.ord AS ord
4+
FROM "copilot_chats" c
5+
CROSS JOIN LATERAL jsonb_array_elements(c."messages") WITH ORDINALITY AS elem(value, ord)
6+
WHERE jsonb_typeof(c."messages") = 'array' AND jsonb_array_length(c."messages") > 0
7+
),
8+
first_occurrence AS (
9+
SELECT chat_id, message_id, MIN(ord) AS first_ord FROM ordered GROUP BY chat_id, message_id
10+
),
11+
ranked AS (
12+
SELECT chat_id, message_id,
13+
(ROW_NUMBER() OVER (PARTITION BY chat_id ORDER BY first_ord) - 1) AS seq
14+
FROM first_occurrence
15+
)
16+
UPDATE "copilot_messages" m SET "seq" = r.seq
17+
FROM ranked r
18+
WHERE m."chat_id" = r.chat_id AND m."message_id" = r.message_id;--> statement-breakpoint
19+
CREATE INDEX "copilot_messages_chat_seq_idx" ON "copilot_messages" USING btree ("chat_id","seq") WHERE "copilot_messages"."deleted_at" IS NULL;

0 commit comments

Comments
 (0)