From 822845edec501365a0413606ed854325dbf631e6 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Sun, 31 May 2026 15:06:03 +0300 Subject: [PATCH 1/2] fix: dedupe active live conversation sessions --- docs/api.md | 6 +- functions/api/[[path]].ts | 17 +++- src/App.tsx | 4 +- tests/api-auth.test.ts | 163 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 5 deletions(-) diff --git a/docs/api.md b/docs/api.md index bbb0226..3568aa0 100644 --- a/docs/api.md +++ b/docs/api.md @@ -206,7 +206,7 @@ human auth boundary that passes `cf-access-authenticated-user-email` and matches | `POST` | `/api/operator/gates` | Create a gate as an operator. | | `POST` | `/api/operator/gates/:gateId/status` | Mark a gate `open`, `waiting`, `satisfied`, `blocked`, or `closed`. | | `GET` | `/api/operator/live-conversations?status=active` | List live conversation mode sessions. | -| `POST` | `/api/operator/live-conversations` | Start live conversation mode for a DM conversation. | +| `POST` | `/api/operator/live-conversations` | Start live conversation mode for a DM conversation, or return the existing active session for that conversation. | | `POST` | `/api/operator/live-conversations/:sessionId/status` | Stop or restart a live conversation session. | | `GET` | `/api/operator/profiles/:agentId` | Read an agent profile during onboarding or review. | | `POST` | `/api/operator/suggestions/:suggestionId/status` | Mark a suggestion as open, accepted, implemented, rejected, or deferred. | @@ -220,6 +220,10 @@ payload. Agents should keep polling their context/DM conversation and continue the discussion until the issue is settled or the operator sends the configured stop command. The default stop command is: +Starting live mode is idempotent per active DM conversation: if a conversation +already has a non-stopped live session, the operator API returns that session +with `existing: true` instead of creating a duplicate session. + ```text stop conversation ``` diff --git a/functions/api/[[path]].ts b/functions/api/[[path]].ts index 1eec02d..9c1b18a 100644 --- a/functions/api/[[path]].ts +++ b/functions/api/[[path]].ts @@ -2225,6 +2225,19 @@ async function createLiveConversation(request: Request, env: Env) { const db = requireDb(env); if (!db.ok) return json({ error: "Live conversations require durable storage." }, 503); const input = await body(request); + const conversationId = requireStringField(input, "conversationId"); + if (!conversationId) return json({ error: "Missing required live conversation fields.", fields: ["conversationId"] }, 400); + const existing = await db.db + .prepare( + `SELECT * + FROM live_conversation_sessions + WHERE conversation_id = ? AND status <> 'stopped' + ORDER BY created_at DESC + LIMIT 1`, + ) + .bind(conversationId) + .first(); + if (existing) return json({ session: normalizeLiveSession(existing), existing: true }); const id = makeId("live"); const createdAt = now(); await db.db @@ -2233,10 +2246,10 @@ async function createLiveConversation(request: Request, env: Env) { (id, conversation_id, status, topic, stop_command, created_by_human_id, created_at) VALUES (?, ?, 'active', ?, ?, ?, ?)`, ) - .bind(id, input.conversationId, input.topic ?? "", input.stopCommand ?? "stop conversation", input.createdByHumanId ?? "human_shay", createdAt) + .bind(id, conversationId, input.topic ?? "", input.stopCommand ?? "stop conversation", input.createdByHumanId ?? "human_shay", createdAt) .run(); const row = await db.db.prepare("SELECT * FROM live_conversation_sessions WHERE id = ?").bind(id).first(); - return json({ session: row }, 201); + return json({ session: normalizeLiveSession(row ?? {}), existing: false }, 201); } async function listLiveConversations(env: Env, status?: string | null) { diff --git a/src/App.tsx b/src/App.tsx index 57ead47..3d1767d 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -2123,7 +2123,7 @@ export function App() { const startLiveConversation = async (conversationId: string) => { const finishMutation = beginOperatorMutation(); try { - await operatorRequest("live-conversations", { + const payload = await operatorRequest("live-conversations", { method: "POST", body: JSON.stringify({ conversationId, @@ -2133,7 +2133,7 @@ export function App() { }), }); await refreshOperatorData({ force: true }); - setActionStatus("Live conversation mode started."); + setActionStatus(payload.existing ? "Live conversation mode already active." : "Live conversation mode started."); } catch (error) { setActionStatus(error instanceof Error ? error.message : "Live mode start failed."); } finally { diff --git a/tests/api-auth.test.ts b/tests/api-auth.test.ts index 3092c43..36648dd 100644 --- a/tests/api-auth.test.ts +++ b/tests/api-auth.test.ts @@ -1,6 +1,79 @@ import { describe, expect, it } from "vitest"; import { onRequest } from "../functions/api/[[path]]"; +type MockLiveSession = { + id: string; + conversation_id: string; + status: string; + topic: string; + stop_command: string; + created_by_human_id: string; + created_at: string; +}; + +class MockLiveSessionDb { + sessions: MockLiveSession[]; + + insertCount = 0; + + constructor(sessions: MockLiveSession[] = []) { + this.sessions = sessions; + } + + prepare(query: string) { + return new MockLiveSessionStatement(this, query); + } +} + +class MockLiveSessionStatement { + private values: unknown[] = []; + + constructor( + private readonly db: MockLiveSessionDb, + private readonly query: string, + ) {} + + bind(...values: unknown[]) { + this.values = values; + return this; + } + + async first(): Promise { + if (this.query.includes("WHERE conversation_id = ? AND status <> 'stopped'")) { + const conversationId = String(this.values[0]); + return this.db.sessions + .filter((session) => session.conversation_id === conversationId && session.status !== "stopped") + .sort((left, right) => right.created_at.localeCompare(left.created_at))[0] as T ?? null; + } + if (this.query.includes("WHERE id = ?")) { + const sessionId = String(this.values[0]); + return this.db.sessions.find((session) => session.id === sessionId) as T ?? null; + } + return null; + } + + async all(): Promise<{ results: T[] }> { + return { results: [] }; + } + + async run() { + if (this.query.includes("INSERT INTO live_conversation_sessions")) { + const [id, conversationId, topic, stopCommand, createdByHumanId, createdAt] = this.values.map(String); + this.db.insertCount += 1; + this.db.sessions.push({ + id, + conversation_id: conversationId, + status: "active", + topic, + stop_command: stopCommand, + created_by_human_id: createdByHumanId, + created_at: createdAt, + }); + } + return {}; + } +} + describe("API auth", () => { it("allows unauthenticated signup requests as pending-only onboarding", async () => { const request = new Request("https://example.test/api/agent/signup-requests", { @@ -279,4 +352,94 @@ describe("API auth", () => { expect(response.status).toBe(400); expect(payload.error).toBe("Invalid live conversation status."); }); + + it("reuses an existing active live session for a direct conversation", async () => { + const db = new MockLiveSessionDb([ + { + id: "live_existing", + conversation_id: "dm_existing", + status: "active", + topic: "Existing operator request.", + stop_command: "stop conversation", + created_by_human_id: "human_shay", + created_at: "2026-05-31T08:00:00.000Z", + }, + ]); + const request = new Request("https://example.test/api/operator/live-conversations", { + method: "POST", + headers: { + authorization: "Bearer operator-token", + "content-type": "application/json", + }, + body: JSON.stringify({ + conversationId: "dm_existing", + topic: "Second operator request.", + }), + }); + + const response = await onRequest({ + request, + env: { OPERATOR_API_TOKEN: "operator-token", DB: db } as never, + }); + expect(response).toBeDefined(); + if (!response) throw new Error("Expected response"); + const payload = await response.json() as { + existing?: boolean; + session?: { id?: string; conversationId?: string; topic?: string }; + }; + + expect(response.status).toBe(200); + expect(payload.existing).toBe(true); + expect(payload.session).toMatchObject({ + id: "live_existing", + conversationId: "dm_existing", + topic: "Existing operator request.", + }); + expect(db.insertCount).toBe(0); + }); + + it("creates a live session when the conversation only has stopped sessions", async () => { + const db = new MockLiveSessionDb([ + { + id: "live_stopped", + conversation_id: "dm_restart", + status: "stopped", + topic: "Previous operator request.", + stop_command: "stop conversation", + created_by_human_id: "human_shay", + created_at: "2026-05-31T08:00:00.000Z", + }, + ]); + const request = new Request("https://example.test/api/operator/live-conversations", { + method: "POST", + headers: { + authorization: "Bearer operator-token", + "content-type": "application/json", + }, + body: JSON.stringify({ + conversationId: "dm_restart", + topic: "Fresh operator request.", + }), + }); + + const response = await onRequest({ + request, + env: { OPERATOR_API_TOKEN: "operator-token", DB: db } as never, + }); + expect(response).toBeDefined(); + if (!response) throw new Error("Expected response"); + const payload = await response.json() as { + existing?: boolean; + session?: { conversationId?: string; status?: string; topic?: string }; + }; + + expect(response.status).toBe(201); + expect(payload.existing).toBe(false); + expect(payload.session).toMatchObject({ + conversationId: "dm_restart", + status: "active", + topic: "Fresh operator request.", + }); + expect(db.insertCount).toBe(1); + }); }); From 5328a57f92da21025ec7da99480dfa2590316d60 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Sun, 31 May 2026 15:09:00 +0300 Subject: [PATCH 2/2] fix: enforce unique active live sessions --- docs/api.md | 1 + functions/api/[[path]].ts | 38 ++++++++------ .../d1/0007_unique_active_live_sessions.sql | 3 ++ .../0007_unique_active_live_sessions.sql | 3 ++ tests/api-auth.test.ts | 49 +++++++++++++++++++ 5 files changed, 80 insertions(+), 14 deletions(-) create mode 100644 migrations/d1/0007_unique_active_live_sessions.sql create mode 100644 migrations/postgres/0007_unique_active_live_sessions.sql diff --git a/docs/api.md b/docs/api.md index 3568aa0..d66ae19 100644 --- a/docs/api.md +++ b/docs/api.md @@ -223,6 +223,7 @@ stop command. The default stop command is: Starting live mode is idempotent per active DM conversation: if a conversation already has a non-stopped live session, the operator API returns that session with `existing: true` instead of creating a duplicate session. +The database enforces the same invariant for concurrent start requests. ```text stop conversation diff --git a/functions/api/[[path]].ts b/functions/api/[[path]].ts index 9c1b18a..368380c 100644 --- a/functions/api/[[path]].ts +++ b/functions/api/[[path]].ts @@ -2227,7 +2227,30 @@ async function createLiveConversation(request: Request, env: Env) { const input = await body(request); const conversationId = requireStringField(input, "conversationId"); if (!conversationId) return json({ error: "Missing required live conversation fields.", fields: ["conversationId"] }, 400); - const existing = await db.db + const existing = await findOpenLiveConversationSession(db.db, conversationId); + if (existing) return json({ session: normalizeLiveSession(existing), existing: true }); + const id = makeId("live"); + const createdAt = now(); + try { + await db.db + .prepare( + `INSERT INTO live_conversation_sessions + (id, conversation_id, status, topic, stop_command, created_by_human_id, created_at) + VALUES (?, ?, 'active', ?, ?, ?, ?)`, + ) + .bind(id, conversationId, input.topic ?? "", input.stopCommand ?? "stop conversation", input.createdByHumanId ?? "human_shay", createdAt) + .run(); + } catch (error) { + const racedSession = await findOpenLiveConversationSession(db.db, conversationId); + if (racedSession) return json({ session: normalizeLiveSession(racedSession), existing: true }); + throw error; + } + const row = await db.db.prepare("SELECT * FROM live_conversation_sessions WHERE id = ?").bind(id).first(); + return json({ session: normalizeLiveSession(row ?? {}), existing: false }, 201); +} + +async function findOpenLiveConversationSession(database: D1Database | PgDatabase, conversationId: string) { + return database .prepare( `SELECT * FROM live_conversation_sessions @@ -2237,19 +2260,6 @@ async function createLiveConversation(request: Request, env: Env) { ) .bind(conversationId) .first(); - if (existing) return json({ session: normalizeLiveSession(existing), existing: true }); - const id = makeId("live"); - const createdAt = now(); - await db.db - .prepare( - `INSERT INTO live_conversation_sessions - (id, conversation_id, status, topic, stop_command, created_by_human_id, created_at) - VALUES (?, ?, 'active', ?, ?, ?, ?)`, - ) - .bind(id, conversationId, input.topic ?? "", input.stopCommand ?? "stop conversation", input.createdByHumanId ?? "human_shay", createdAt) - .run(); - const row = await db.db.prepare("SELECT * FROM live_conversation_sessions WHERE id = ?").bind(id).first(); - return json({ session: normalizeLiveSession(row ?? {}), existing: false }, 201); } async function listLiveConversations(env: Env, status?: string | null) { diff --git a/migrations/d1/0007_unique_active_live_sessions.sql b/migrations/d1/0007_unique_active_live_sessions.sql new file mode 100644 index 0000000..46ceed6 --- /dev/null +++ b/migrations/d1/0007_unique_active_live_sessions.sql @@ -0,0 +1,3 @@ +CREATE UNIQUE INDEX IF NOT EXISTS uq_live_conversation_sessions_open_conversation + ON live_conversation_sessions(conversation_id) + WHERE status <> 'stopped'; diff --git a/migrations/postgres/0007_unique_active_live_sessions.sql b/migrations/postgres/0007_unique_active_live_sessions.sql new file mode 100644 index 0000000..46ceed6 --- /dev/null +++ b/migrations/postgres/0007_unique_active_live_sessions.sql @@ -0,0 +1,3 @@ +CREATE UNIQUE INDEX IF NOT EXISTS uq_live_conversation_sessions_open_conversation + ON live_conversation_sessions(conversation_id) + WHERE status <> 'stopped'; diff --git a/tests/api-auth.test.ts b/tests/api-auth.test.ts index 36648dd..eb16c40 100644 --- a/tests/api-auth.test.ts +++ b/tests/api-auth.test.ts @@ -15,6 +15,7 @@ class MockLiveSessionDb { sessions: MockLiveSession[]; insertCount = 0; + insertConflictSession?: MockLiveSession; constructor(sessions: MockLiveSession[] = []) { this.sessions = sessions; @@ -60,6 +61,10 @@ class MockLiveSessionStatement { if (this.query.includes("INSERT INTO live_conversation_sessions")) { const [id, conversationId, topic, stopCommand, createdByHumanId, createdAt] = this.values.map(String); this.db.insertCount += 1; + if (this.db.insertConflictSession) { + this.db.sessions.push(this.db.insertConflictSession); + throw new Error("UNIQUE constraint failed: live_conversation_sessions.conversation_id"); + } this.db.sessions.push({ id, conversation_id: conversationId, @@ -442,4 +447,48 @@ describe("API auth", () => { }); expect(db.insertCount).toBe(1); }); + + it("returns the raced live session when a concurrent create wins the insert", async () => { + const db = new MockLiveSessionDb(); + db.insertConflictSession = { + id: "live_raced", + conversation_id: "dm_raced", + status: "active", + topic: "Concurrent operator request.", + stop_command: "stop conversation", + created_by_human_id: "human_shay", + created_at: "2026-05-31T08:01:00.000Z", + }; + const request = new Request("https://example.test/api/operator/live-conversations", { + method: "POST", + headers: { + authorization: "Bearer operator-token", + "content-type": "application/json", + }, + body: JSON.stringify({ + conversationId: "dm_raced", + topic: "Losing operator request.", + }), + }); + + const response = await onRequest({ + request, + env: { OPERATOR_API_TOKEN: "operator-token", DB: db } as never, + }); + expect(response).toBeDefined(); + if (!response) throw new Error("Expected response"); + const payload = await response.json() as { + existing?: boolean; + session?: { id?: string; conversationId?: string; topic?: string }; + }; + + expect(response.status).toBe(200); + expect(payload.existing).toBe(true); + expect(payload.session).toMatchObject({ + id: "live_raced", + conversationId: "dm_raced", + topic: "Concurrent operator request.", + }); + expect(db.insertCount).toBe(1); + }); });