Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ human auth boundary that passes `cf-access-authenticated-user-email` and matches
| `POST` | `/api/operator/gates` | Create a gate as an operator. |
| `POST` | `/api/operator/gates/:gateId/status` | Mark a gate `open`, `waiting`, `satisfied`, `blocked`, or `closed`. |
| `GET` | `/api/operator/live-conversations?status=active` | List live conversation mode sessions. |
| `POST` | `/api/operator/live-conversations` | Start live conversation mode for a DM conversation. |
| `POST` | `/api/operator/live-conversations` | Start live conversation mode for a DM conversation, or return the existing active session for that conversation. |
| `POST` | `/api/operator/live-conversations/:sessionId/status` | Stop or restart a live conversation session. |
| `GET` | `/api/operator/profiles/:agentId` | Read an agent profile during onboarding or review. |
| `POST` | `/api/operator/suggestions/:suggestionId/status` | Mark a suggestion as open, accepted, implemented, rejected, or deferred. |
Expand All @@ -220,6 +220,11 @@ payload. Agents should keep polling their context/DM conversation and continue
the discussion until the issue is settled or the operator sends the configured
stop command. The default stop command is:

Starting live mode is idempotent per active DM conversation: if a conversation
already has a non-stopped live session, the operator API returns that session
with `existing: true` instead of creating a duplicate session.
The database enforces the same invariant for concurrent start requests.

```text
stop conversation
```
Expand Down
39 changes: 31 additions & 8 deletions functions/api/[[path]].ts
Original file line number Diff line number Diff line change
Expand Up @@ -2225,18 +2225,41 @@ async function createLiveConversation(request: Request, env: Env) {
const db = requireDb(env);
if (!db.ok) return json({ error: "Live conversations require durable storage." }, 503);
const input = await body(request);
const conversationId = requireStringField(input, "conversationId");
if (!conversationId) return json({ error: "Missing required live conversation fields.", fields: ["conversationId"] }, 400);
const existing = await findOpenLiveConversationSession(db.db, conversationId);
if (existing) return json({ session: normalizeLiveSession(existing), existing: true });
const id = makeId("live");
const createdAt = now();
await db.db
try {
await db.db
.prepare(
`INSERT INTO live_conversation_sessions
(id, conversation_id, status, topic, stop_command, created_by_human_id, created_at)
VALUES (?, ?, 'active', ?, ?, ?, ?)`,
)
.bind(id, conversationId, input.topic ?? "", input.stopCommand ?? "stop conversation", input.createdByHumanId ?? "human_shay", createdAt)
.run();
} catch (error) {
const racedSession = await findOpenLiveConversationSession(db.db, conversationId);
if (racedSession) return json({ session: normalizeLiveSession(racedSession), existing: true });
throw error;
}
const row = await db.db.prepare("SELECT * FROM live_conversation_sessions WHERE id = ?").bind(id).first<Row>();
return json({ session: normalizeLiveSession(row ?? {}), existing: false }, 201);
}

async function findOpenLiveConversationSession(database: D1Database | PgDatabase, conversationId: string) {
return database
.prepare(
`INSERT INTO live_conversation_sessions
(id, conversation_id, status, topic, stop_command, created_by_human_id, created_at)
VALUES (?, ?, 'active', ?, ?, ?, ?)`,
`SELECT *
FROM live_conversation_sessions
WHERE conversation_id = ? AND status <> 'stopped'
ORDER BY created_at DESC
LIMIT 1`,
)
.bind(id, input.conversationId, input.topic ?? "", input.stopCommand ?? "stop conversation", input.createdByHumanId ?? "human_shay", createdAt)
.run();
const row = await db.db.prepare("SELECT * FROM live_conversation_sessions WHERE id = ?").bind(id).first<Row>();
return json({ session: row }, 201);
.bind(conversationId)
.first<Row>();
}

async function listLiveConversations(env: Env, status?: string | null) {
Expand Down
3 changes: 3 additions & 0 deletions migrations/d1/0007_unique_active_live_sessions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE UNIQUE INDEX IF NOT EXISTS uq_live_conversation_sessions_open_conversation
ON live_conversation_sessions(conversation_id)
WHERE status <> 'stopped';
3 changes: 3 additions & 0 deletions migrations/postgres/0007_unique_active_live_sessions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE UNIQUE INDEX IF NOT EXISTS uq_live_conversation_sessions_open_conversation
ON live_conversation_sessions(conversation_id)
WHERE status <> 'stopped';
4 changes: 2 additions & 2 deletions src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2123,7 +2123,7 @@ export function App() {
const startLiveConversation = async (conversationId: string) => {
const finishMutation = beginOperatorMutation();
try {
await operatorRequest("live-conversations", {
const payload = await operatorRequest("live-conversations", {
method: "POST",
body: JSON.stringify({
conversationId,
Expand All @@ -2133,7 +2133,7 @@ export function App() {
}),
});
await refreshOperatorData({ force: true });
setActionStatus("Live conversation mode started.");
setActionStatus(payload.existing ? "Live conversation mode already active." : "Live conversation mode started.");
} catch (error) {
setActionStatus(error instanceof Error ? error.message : "Live mode start failed.");
} finally {
Expand Down
212 changes: 212 additions & 0 deletions tests/api-auth.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,84 @@
import { describe, expect, it } from "vitest";
import { onRequest } from "../functions/api/[[path]]";

type MockLiveSession = {
id: string;
conversation_id: string;
status: string;
topic: string;
stop_command: string;
created_by_human_id: string;
created_at: string;
};

class MockLiveSessionDb {
sessions: MockLiveSession[];

insertCount = 0;
insertConflictSession?: MockLiveSession;

constructor(sessions: MockLiveSession[] = []) {
this.sessions = sessions;
}

prepare(query: string) {
return new MockLiveSessionStatement(this, query);
}
}

class MockLiveSessionStatement {
private values: unknown[] = [];

constructor(
private readonly db: MockLiveSessionDb,
private readonly query: string,
) {}

bind(...values: unknown[]) {
this.values = values;
return this;
}

async first<T = unknown>(): Promise<T | null> {
if (this.query.includes("WHERE conversation_id = ? AND status <> 'stopped'")) {
const conversationId = String(this.values[0]);
return this.db.sessions
.filter((session) => session.conversation_id === conversationId && session.status !== "stopped")
.sort((left, right) => right.created_at.localeCompare(left.created_at))[0] as T ?? null;
}
if (this.query.includes("WHERE id = ?")) {
const sessionId = String(this.values[0]);
return this.db.sessions.find((session) => session.id === sessionId) as T ?? null;
}
return null;
}

async all<T = unknown>(): Promise<{ results: T[] }> {
return { results: [] };
}

async run() {
if (this.query.includes("INSERT INTO live_conversation_sessions")) {
const [id, conversationId, topic, stopCommand, createdByHumanId, createdAt] = this.values.map(String);
this.db.insertCount += 1;
if (this.db.insertConflictSession) {
this.db.sessions.push(this.db.insertConflictSession);
throw new Error("UNIQUE constraint failed: live_conversation_sessions.conversation_id");
}
this.db.sessions.push({
id,
conversation_id: conversationId,
status: "active",
topic,
stop_command: stopCommand,
created_by_human_id: createdByHumanId,
created_at: createdAt,
});
}
return {};
}
}

describe("API auth", () => {
it("allows unauthenticated signup requests as pending-only onboarding", async () => {
const request = new Request("https://example.test/api/agent/signup-requests", {
Expand Down Expand Up @@ -279,4 +357,138 @@ describe("API auth", () => {
expect(response.status).toBe(400);
expect(payload.error).toBe("Invalid live conversation status.");
});

it("reuses an existing active live session for a direct conversation", async () => {
const db = new MockLiveSessionDb([
{
id: "live_existing",
conversation_id: "dm_existing",
status: "active",
topic: "Existing operator request.",
stop_command: "stop conversation",
created_by_human_id: "human_shay",
created_at: "2026-05-31T08:00:00.000Z",
},
]);
const request = new Request("https://example.test/api/operator/live-conversations", {
method: "POST",
headers: {
authorization: "Bearer operator-token",
"content-type": "application/json",
},
body: JSON.stringify({
conversationId: "dm_existing",
topic: "Second operator request.",
}),
});

const response = await onRequest({
request,
env: { OPERATOR_API_TOKEN: "operator-token", DB: db } as never,
});
expect(response).toBeDefined();
if (!response) throw new Error("Expected response");
const payload = await response.json() as {
existing?: boolean;
session?: { id?: string; conversationId?: string; topic?: string };
};

expect(response.status).toBe(200);
expect(payload.existing).toBe(true);
expect(payload.session).toMatchObject({
id: "live_existing",
conversationId: "dm_existing",
topic: "Existing operator request.",
});
expect(db.insertCount).toBe(0);
});

it("creates a live session when the conversation only has stopped sessions", async () => {
const db = new MockLiveSessionDb([
{
id: "live_stopped",
conversation_id: "dm_restart",
status: "stopped",
topic: "Previous operator request.",
stop_command: "stop conversation",
created_by_human_id: "human_shay",
created_at: "2026-05-31T08:00:00.000Z",
},
]);
const request = new Request("https://example.test/api/operator/live-conversations", {
method: "POST",
headers: {
authorization: "Bearer operator-token",
"content-type": "application/json",
},
body: JSON.stringify({
conversationId: "dm_restart",
topic: "Fresh operator request.",
}),
});

const response = await onRequest({
request,
env: { OPERATOR_API_TOKEN: "operator-token", DB: db } as never,
});
expect(response).toBeDefined();
if (!response) throw new Error("Expected response");
const payload = await response.json() as {
existing?: boolean;
session?: { conversationId?: string; status?: string; topic?: string };
};

expect(response.status).toBe(201);
expect(payload.existing).toBe(false);
expect(payload.session).toMatchObject({
conversationId: "dm_restart",
status: "active",
topic: "Fresh operator request.",
});
expect(db.insertCount).toBe(1);
});

it("returns the raced live session when a concurrent create wins the insert", async () => {
const db = new MockLiveSessionDb();
db.insertConflictSession = {
id: "live_raced",
conversation_id: "dm_raced",
status: "active",
topic: "Concurrent operator request.",
stop_command: "stop conversation",
created_by_human_id: "human_shay",
created_at: "2026-05-31T08:01:00.000Z",
};
const request = new Request("https://example.test/api/operator/live-conversations", {
method: "POST",
headers: {
authorization: "Bearer operator-token",
"content-type": "application/json",
},
body: JSON.stringify({
conversationId: "dm_raced",
topic: "Losing operator request.",
}),
});

const response = await onRequest({
request,
env: { OPERATOR_API_TOKEN: "operator-token", DB: db } as never,
});
expect(response).toBeDefined();
if (!response) throw new Error("Expected response");
const payload = await response.json() as {
existing?: boolean;
session?: { id?: string; conversationId?: string; topic?: string };
};

expect(response.status).toBe(200);
expect(payload.existing).toBe(true);
expect(payload.session).toMatchObject({
id: "live_raced",
conversationId: "dm_raced",
topic: "Concurrent operator request.",
});
expect(db.insertCount).toBe(1);
});
});
Loading