Skip to content

Commit 8f78f2c

Browse files
waleedlatif1claude
andcommitted
feat(copilot): add seq ordinal to copilot_messages for order-preserving reads
copilot_messages had no column preserving message order: created_at (set from each message's timestamp) ties at millisecond granularity in 58% of chats, and some chats have out-of-order timestamps within their array. The only other tiebreaker, id, is a random UUID — so ORDER BY created_at, id renders same-timestamp user/assistant pairs swapped. This blocks the R+1 read cutover. Add an integer seq = the message's 0-based index within the chat's JSONB array (ground-truth order), backfilled inline in migration 0219 (no script for self-hosters or us). Reads will use ORDER BY seq NULLS LAST, created_at, id at cutover; reads still come from JSONB after this PR. Design: - seq is a tiebreaker, not the sole sort key (concurrent-append/NULL safety). - Nullable now; defer NOT NULL so rolling-deploy old pods don't fail inserts. - replace (update-messages snapshot) overwrites seq = array index (re-densifies after a mid-conversation delete); append preserves existing seq via COALESCE and assigns base+idx from a single MAX(seq) read (never MAX+i in SQL — multi-row batches would collide). The non-atomic read-then-insert window is documented and bounded by the read tiebreak + snapshot re-densify. - Dedupe message ids before insert (87 prod chats carry dup ids; a repeated id in one INSERT...ON CONFLICT would otherwise throw). - Backfill picks first-occurrence per (chat,id), gap-free via ROW_NUMBER; validated on staging data (0-based, contiguous, 0 bad ranges). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 6598927 commit 8f78f2c

7 files changed

Lines changed: 17667 additions & 20 deletions

File tree

apps/sim/lib/copilot/chat/messages-dual-write.test.ts

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ const assistantMsg: PersistedMessage = {
2626
timestamp: '2026-01-01T00:00:01.000Z',
2727
}
2828

29+
/** The first arg passed to the most recent `.values(...)` call. */
30+
function lastValuesRows() {
31+
const calls = dbChainMockFns.values.mock.calls
32+
return calls[calls.length - 1][0] as Array<Record<string, unknown>>
33+
}
34+
2935
describe('messages-dual-write', () => {
3036
beforeEach(() => {
3137
vi.clearAllMocks()
@@ -43,7 +49,7 @@ describe('messages-dual-write', () => {
4349

4450
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
4551
expect(dbChainMockFns.values).toHaveBeenCalledTimes(1)
46-
const rows = dbChainMockFns.values.mock.calls[0][0]
52+
const rows = lastValuesRows()
4753
expect(rows).toHaveLength(2)
4854

4955
expect(rows[0]).toMatchObject({
@@ -54,22 +60,38 @@ describe('messages-dual-write', () => {
5460
model: null,
5561
streamId: null,
5662
})
57-
expect(rows[0].createdAt).toEqual(new Date(userMsg.timestamp))
58-
expect(rows[0].updatedAt).toEqual(new Date(userMsg.timestamp))
63+
expect(rows[0].createdAt as Date).toEqual(new Date(userMsg.timestamp))
64+
expect(rows[0].updatedAt as Date).toEqual(new Date(userMsg.timestamp))
5965

6066
expect(rows[1]).toMatchObject({
6167
chatId: 'chat-1',
6268
messageId: 'msg-asst-1',
6369
role: 'assistant',
6470
content: assistantMsg,
6571
})
66-
expect(rows[1].createdAt).toEqual(new Date(assistantMsg.timestamp))
72+
expect(rows[1].createdAt as Date).toEqual(new Date(assistantMsg.timestamp))
6773
})
6874

69-
it('preserves per-message ordering via timestamp', async () => {
75+
it('assigns seq as 0-based array index when the chat has no prior rows', async () => {
76+
// MAX(seq) over a chat with no rows returns NULL in Postgres; mock the
77+
// select().from().where() chain explicitly so the base=0 path is not
78+
// relying on the chain mock's implicit empty-array default.
79+
dbChainMockFns.where.mockResolvedValueOnce([{ maxSeq: null }])
80+
7081
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
71-
const rows = dbChainMockFns.values.mock.calls[0][0]
72-
expect(rows[0].createdAt.getTime()).toBeLessThan(rows[1].createdAt.getTime())
82+
const rows = lastValuesRows()
83+
expect(rows[0].seq).toBe(0)
84+
expect(rows[1].seq).toBe(1)
85+
})
86+
87+
it('continues seq from MAX(seq)+1 when the chat already has rows', async () => {
88+
// MAX(seq) read resolves through the select().from().where() chain.
89+
dbChainMockFns.where.mockResolvedValueOnce([{ maxSeq: 4 }])
90+
91+
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
92+
const rows = lastValuesRows()
93+
expect(rows[0].seq).toBe(5)
94+
expect(rows[1].seq).toBe(6)
7395
})
7496

7597
it('passes chatModel and streamId options to every row', async () => {
@@ -78,14 +100,14 @@ describe('messages-dual-write', () => {
78100
streamId: 'stream-xyz',
79101
})
80102

81-
const rows = dbChainMockFns.values.mock.calls[0][0]
103+
const rows = lastValuesRows()
82104
expect(rows[0].model).toBe('claude-sonnet-4-5')
83105
expect(rows[0].streamId).toBe('stream-xyz')
84106
expect(rows[1].model).toBe('claude-sonnet-4-5')
85107
expect(rows[1].streamId).toBe('stream-xyz')
86108
})
87109

88-
it('uses ON CONFLICT DO UPDATE with chat_id + message_id target', async () => {
110+
it('uses ON CONFLICT DO UPDATE that PRESERVES existing seq', async () => {
89111
await appendCopilotChatMessages('chat-1', [userMsg])
90112

91113
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
@@ -96,6 +118,15 @@ describe('messages-dual-write', () => {
96118
expect(conflictArg.set).toHaveProperty('model')
97119
expect(conflictArg.set).toHaveProperty('streamId')
98120
expect(conflictArg.set).toHaveProperty('updatedAt')
121+
// append must not renumber existing rows -> COALESCE(existing, excluded)
122+
expect(conflictArg.set.seq.strings.join('')).toContain('COALESCE(')
123+
})
124+
125+
it('collapses duplicate message ids to a single row', async () => {
126+
await appendCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
127+
const rows = lastValuesRows()
128+
expect(rows).toHaveLength(1)
129+
expect(rows[0].messageId).toBe('msg-user-1')
99130
})
100131

101132
it('swallows DB errors so the legacy JSONB write stays canonical', async () => {
@@ -120,25 +151,43 @@ describe('messages-dual-write', () => {
120151
expect(dbChainMockFns.delete).toHaveBeenCalledTimes(1)
121152
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
122153

123-
const rows = dbChainMockFns.values.mock.calls[0][0]
154+
const rows = lastValuesRows()
124155
expect(rows).toHaveLength(2)
125-
expect(rows.map((r: { messageId: string }) => r.messageId)).toEqual([
126-
'msg-user-1',
127-
'msg-asst-1',
128-
])
156+
expect(rows.map((r) => r.messageId)).toEqual(['msg-user-1', 'msg-asst-1'])
129157

130158
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
131159
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
132160
expect(conflictArg.set).toHaveProperty('streamId')
133161
expect(conflictArg.set).toHaveProperty('model')
134162
})
135163

164+
it('assigns seq as the snapshot array index (0-based)', async () => {
165+
await replaceCopilotChatMessages('chat-1', [userMsg, assistantMsg])
166+
const rows = lastValuesRows()
167+
expect(rows[0].seq).toBe(0)
168+
expect(rows[1].seq).toBe(1)
169+
})
170+
171+
it('OVERWRITES seq on conflict so positions re-densify after a delete', async () => {
172+
await replaceCopilotChatMessages('chat-1', [userMsg])
173+
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
174+
// authoritative snapshot -> seq = excluded.seq (no COALESCE preserve)
175+
expect(conflictArg.set.seq.strings.join('')).toBe('excluded.seq')
176+
})
177+
178+
it('collapses duplicate message ids to a single row', async () => {
179+
await replaceCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
180+
const rows = lastValuesRows()
181+
expect(rows).toHaveLength(1)
182+
expect(rows[0].seq).toBe(0)
183+
})
184+
136185
it('passes chatModel to every row in the snapshot', async () => {
137186
await replaceCopilotChatMessages('chat-1', [userMsg], {
138187
chatModel: 'gpt-4o-mini',
139188
})
140189

141-
const rows = dbChainMockFns.values.mock.calls[0][0]
190+
const rows = lastValuesRows()
142191
expect(rows[0].model).toBe('gpt-4o-mini')
143192
})
144193

apps/sim/lib/copilot/chat/messages-dual-write.ts

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,27 @@ import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
77

88
const logger = createLogger('CopilotMessagesDualWrite')
99

10+
/**
11+
* Collapse duplicate message ids (keeping the first occurrence), preserving
12+
* order. A single `INSERT ... ON CONFLICT` cannot touch the same conflict
13+
* target twice, so a snapshot/array carrying a repeated id would otherwise
14+
* throw "ON CONFLICT DO UPDATE command cannot affect row a second time".
15+
*/
16+
function dedupeById(messages: PersistedMessage[]): PersistedMessage[] {
17+
const seen = new Set<string>()
18+
const out: PersistedMessage[] = []
19+
for (const m of messages) {
20+
if (seen.has(m.id)) continue
21+
seen.add(m.id)
22+
out.push(m)
23+
}
24+
return out
25+
}
26+
1027
function toRow(
1128
chatId: string,
1229
message: PersistedMessage,
30+
seq: number,
1331
options?: { chatModel?: string | null; streamId?: string | null }
1432
): typeof copilotMessages.$inferInsert {
1533
const ts = new Date(message.timestamp)
@@ -18,6 +36,7 @@ function toRow(
1836
messageId: message.id,
1937
role: message.role,
2038
content: message,
39+
seq,
2140
model: options?.chatModel ?? null,
2241
streamId: options?.streamId ?? null,
2342
createdAt: ts,
@@ -29,6 +48,19 @@ function toRow(
2948
* Append messages to the new `copilot_messages` table. Best-effort — errors
3049
* are logged but never thrown, since the legacy `copilot_chats.messages`
3150
* JSONB column remains the source of truth during the dual-write rollout.
51+
*
52+
* Ordering: `seq` is computed as `MAX(seq) + array index` from a non-atomic
53+
* read-then-insert. Two appends to the same chat that interleave (both reading
54+
* the same `MAX(seq)` before either inserts) can assign overlapping `seq` to
55+
* distinct new messages — the `COALESCE` conflict guard only protects re-sends
56+
* of an existing `(chat_id, message_id)`, not genuinely new rows. This is an
57+
* accepted, bounded window, not a correctness hole, because both read sides
58+
* cover it: the cutover read order is `seq, created_at, id` (so a `seq` tie
59+
* still resolves deterministically by time then id), and `replaceCopilotChatMessages`
60+
* re-densifies `seq` from the authoritative JSONB order on the next snapshot
61+
* save. A per-chat lock would close the window but is unwarranted for a
62+
* best-effort shadow write — copilot turns for a single chat are effectively
63+
* serialized (one stream at a time), so true concurrent appends are rare.
3264
*/
3365
export async function appendCopilotChatMessages(
3466
chatId: string,
@@ -37,16 +69,28 @@ export async function appendCopilotChatMessages(
3769
): Promise<void> {
3870
if (messages.length === 0) return
3971
try {
72+
const deduped = dedupeById(messages)
73+
// Assign seq as base + array index. Base is computed in JS from a single
74+
// MAX(seq) read — never `MAX(seq)+i` in SQL, since a multi-row INSERT
75+
// evaluates every row's subquery against the same pre-insert state and the
76+
// rows would collide. Existing rows keep their seq on conflict (COALESCE);
77+
// the authoritative renumber happens via replaceCopilotChatMessages.
78+
const [maxRow] = await db
79+
.select({ maxSeq: sql<number | null>`max(${copilotMessages.seq})` })
80+
.from(copilotMessages)
81+
.where(eq(copilotMessages.chatId, chatId))
82+
const base = (maxRow?.maxSeq ?? -1) + 1
4083
await db
4184
.insert(copilotMessages)
42-
.values(messages.map((m) => toRow(chatId, m, options)))
85+
.values(deduped.map((m, i) => toRow(chatId, m, base + i, options)))
4386
.onConflictDoUpdate({
4487
target: [copilotMessages.chatId, copilotMessages.messageId],
4588
set: {
4689
content: sql`excluded.content`,
4790
role: sql`excluded.role`,
4891
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
4992
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
93+
seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`,
5094
updatedAt: sql`now()`,
5195
},
5296
})
@@ -69,7 +113,8 @@ export async function replaceCopilotChatMessages(
69113
options?: { chatModel?: string | null }
70114
): Promise<void> {
71115
try {
72-
const newMessageIds = messages.map((m) => m.id)
116+
const deduped = dedupeById(messages)
117+
const newMessageIds = deduped.map((m) => m.id)
73118
await db.transaction(async (tx) => {
74119
// Drop rows for messages not in the new snapshot.
75120
await tx
@@ -82,19 +127,22 @@ export async function replaceCopilotChatMessages(
82127
)
83128
: eq(copilotMessages.chatId, chatId)
84129
)
85-
if (messages.length === 0) return
86-
// Upsert remaining rows. ON CONFLICT preserves existing stream_id / model
130+
if (deduped.length === 0) return
131+
// Upsert remaining rows. The snapshot is authoritative on order, so seq is
132+
// overwritten with the array index — this re-densifies positions after a
133+
// mid-conversation delete. ON CONFLICT preserves existing stream_id / model
87134
// so a snapshot save doesn't clobber metadata set during streaming.
88135
await tx
89136
.insert(copilotMessages)
90-
.values(messages.map((m) => toRow(chatId, m, options)))
137+
.values(deduped.map((m, i) => toRow(chatId, m, i, options)))
91138
.onConflictDoUpdate({
92139
target: [copilotMessages.chatId, copilotMessages.messageId],
93140
set: {
94141
content: sql`excluded.content`,
95142
role: sql`excluded.role`,
96143
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
97144
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
145+
seq: sql`excluded.seq`,
98146
updatedAt: sql`now()`,
99147
},
100148
})
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
ALTER TABLE "copilot_messages" ADD COLUMN "seq" integer;--> statement-breakpoint
2+
WITH ordered AS (
3+
SELECT c."id" AS chat_id, elem.value->>'id' AS message_id, elem.ord AS ord
4+
FROM "copilot_chats" c
5+
CROSS JOIN LATERAL jsonb_array_elements(c."messages") WITH ORDINALITY AS elem(value, ord)
6+
WHERE jsonb_typeof(c."messages") = 'array' AND jsonb_array_length(c."messages") > 0
7+
),
8+
first_occurrence AS (
9+
SELECT chat_id, message_id, MIN(ord) AS first_ord FROM ordered GROUP BY chat_id, message_id
10+
),
11+
ranked AS (
12+
SELECT chat_id, message_id,
13+
(ROW_NUMBER() OVER (PARTITION BY chat_id ORDER BY first_ord) - 1) AS seq
14+
FROM first_occurrence
15+
)
16+
UPDATE "copilot_messages" m SET "seq" = r.seq
17+
FROM ranked r
18+
WHERE m."chat_id" = r.chat_id AND m."message_id" = r.message_id;--> statement-breakpoint
19+
CREATE INDEX "copilot_messages_chat_seq_idx" ON "copilot_messages" USING btree ("chat_id","seq") WHERE "copilot_messages"."deleted_at" IS NULL;

0 commit comments

Comments
 (0)