Skip to content

Commit 77df623

Browse files
committed
Fix Composio session race and timeout cache
1 parent 520c9a1 commit 77df623

4 files changed

Lines changed: 155 additions & 19 deletions

File tree

sdk/src/__tests__/composio.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { afterEach, describe, expect, mock, test } from 'bun:test'
2+
3+
import { getComposioCustomToolDefinitions } from '../composio'
4+
5+
describe('getComposioCustomToolDefinitions', () => {
6+
const originalFetch = globalThis.fetch
7+
8+
afterEach(() => {
9+
globalThis.fetch = originalFetch
10+
})
11+
12+
test('does not cache an empty tool list after discovery timeout', async () => {
13+
const apiKey = `timeout-key-${Date.now()}`
14+
const timeoutFetch = mock(
15+
async (_url: string | URL | Request, init?: RequestInit) => {
16+
const signal = init?.signal
17+
return new Promise<Response>((_resolve, reject) => {
18+
if (signal?.aborted) {
19+
reject(new Error('aborted'))
20+
return
21+
}
22+
23+
signal?.addEventListener(
24+
'abort',
25+
() => reject(new Error('aborted')),
26+
{ once: true },
27+
)
28+
})
29+
},
30+
)
31+
globalThis.fetch = timeoutFetch as unknown as typeof fetch
32+
33+
const timedOutTools = await getComposioCustomToolDefinitions({
34+
apiKey,
35+
logger: { warn: mock(() => {}) },
36+
})
37+
expect(timedOutTools).toEqual([])
38+
expect(timeoutFetch).toHaveBeenCalledTimes(1)
39+
40+
const successFetch = mock(async () => {
41+
return new Response(
42+
JSON.stringify({
43+
sessionId: 'session-123',
44+
tools: [
45+
{
46+
toolName: 'COMPOSIO_SEARCH_TOOLS',
47+
inputSchema: { type: 'object', properties: {} },
48+
description: 'Search tools',
49+
},
50+
],
51+
}),
52+
{ status: 200 },
53+
)
54+
})
55+
globalThis.fetch = successFetch as unknown as typeof fetch
56+
57+
const tools = await getComposioCustomToolDefinitions({
58+
apiKey,
59+
logger: { warn: mock(() => {}) },
60+
})
61+
62+
expect(successFetch).toHaveBeenCalledTimes(1)
63+
expect(tools).toHaveLength(1)
64+
expect(tools[0]?.toolName).toBe('COMPOSIO_SEARCH_TOOLS')
65+
})
66+
})

sdk/src/composio.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,23 @@ export async function getComposioCustomToolDefinitions(params: {
185185
signal: discoverySignal.signal,
186186
})
187187
} catch (error) {
188-
if (!params.signal?.aborted) {
189-
cacheComposioTools(params.apiKey, [], COMPOSIO_DISCOVERY_FAILURE_CACHE_MS)
188+
if (params.signal?.aborted) {
189+
return []
190+
}
191+
192+
if (discoverySignal.signal.aborted) {
190193
params.logger?.warn(
191194
{ error: error instanceof Error ? error.message : String(error) },
192-
'Failed to fetch Composio tools',
195+
'Timed out fetching Composio tools',
193196
)
197+
return []
194198
}
199+
200+
cacheComposioTools(params.apiKey, [], COMPOSIO_DISCOVERY_FAILURE_CACHE_MS)
201+
params.logger?.warn(
202+
{ error: error instanceof Error ? error.message : String(error) },
203+
'Failed to fetch Composio tools',
204+
)
195205
return []
196206
} finally {
197207
discoverySignal.cleanup()

web/src/server/__tests__/composio.test.ts

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,27 @@ describe('getComposioToolsForUser', () => {
4545
])
4646
})
4747

48-
function makeDb(storedSessionId: string | null) {
49-
const findFirst = mock(async () =>
50-
storedSessionId
48+
function makeDb(storedSessionIds: string | null | Array<string | null>) {
49+
const storedSessionIdSequence = Array.isArray(storedSessionIds)
50+
? [...storedSessionIds]
51+
: [storedSessionIds]
52+
const findFirst = mock(async () => {
53+
const storedSessionId =
54+
storedSessionIdSequence.length > 1
55+
? storedSessionIdSequence.shift()
56+
: storedSessionIdSequence[0]
57+
58+
return storedSessionId
5159
? {
5260
user_id: 'user-123',
5361
session_id: storedSessionId,
5462
created_at: new Date(),
5563
updated_at: new Date(),
5664
}
57-
: null,
58-
)
59-
const onConflictDoUpdate = mock(async () => undefined)
60-
const values = mock(() => ({ onConflictDoUpdate }))
65+
: null
66+
})
67+
const onConflictDoNothing = mock(async () => undefined)
68+
const values = mock(() => ({ onConflictDoNothing }))
6169
const whereDelete = mock(async () => undefined)
6270

6371
return {
@@ -71,7 +79,7 @@ describe('getComposioToolsForUser', () => {
7179
delete: mock(() => ({ where: whereDelete })),
7280
} as any,
7381
findFirst,
74-
onConflictDoUpdate,
82+
onConflictDoNothing,
7583
values,
7684
whereDelete,
7785
}
@@ -84,7 +92,10 @@ describe('getComposioToolsForUser', () => {
8492
useSession = mock(async () => {
8593
throw notFound
8694
})
87-
const { db, whereDelete, values } = makeDb('stored-session')
95+
const { db, whereDelete, values } = makeDb([
96+
'stored-session',
97+
'fresh-session',
98+
])
8899

89100
const result = await getComposioToolsForUser({
90101
db,
@@ -112,6 +123,32 @@ describe('getComposioToolsForUser', () => {
112123
})
113124
})
114125

126+
test('returns the persisted session when concurrent creation stores a different session', async () => {
127+
createSession = mock(async () => ({ sessionId: 'losing-session' }))
128+
useSession = mock(async () => ({ sessionId: 'winning-session' }))
129+
const { db, values, onConflictDoNothing } = makeDb([
130+
null,
131+
'winning-session',
132+
])
133+
134+
const result = await getComposioToolsForUser({
135+
db,
136+
userId: 'user-123',
137+
logger,
138+
apiKey: 'test-composio-api-key',
139+
})
140+
141+
expect(result?.sessionId).toBe('winning-session')
142+
expect(createSession).toHaveBeenCalledWith('user-123')
143+
expect(values).toHaveBeenCalledWith({
144+
user_id: 'user-123',
145+
session_id: 'losing-session',
146+
})
147+
expect(onConflictDoNothing).toHaveBeenCalledTimes(1)
148+
expect(useSession).toHaveBeenCalledWith('winning-session')
149+
expect(getRawToolRouterSessionTools).toHaveBeenCalledWith('winning-session')
150+
})
151+
115152
test('keeps the stored session row when rehydration fails transiently', async () => {
116153
const transientError = Object.assign(new Error('Composio unavailable'), {
117154
status: 502,

web/src/server/composio.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ async function getToolDefinitionsForSession(params: {
121121
}))
122122
}
123123

124-
async function saveSession(params: {
124+
async function insertSessionIfAbsent(params: {
125125
db: CodebuffPgDatabase
126126
userId: string
127127
sessionId: string
@@ -132,12 +132,8 @@ async function saveSession(params: {
132132
user_id: params.userId,
133133
session_id: params.sessionId,
134134
})
135-
.onConflictDoUpdate({
135+
.onConflictDoNothing({
136136
target: schema.composioSession.user_id,
137-
set: {
138-
session_id: params.sessionId,
139-
updated_at: new Date(),
140-
},
141137
})
142138
}
143139

@@ -182,15 +178,41 @@ async function createSessionForUser(params: {
182178
db: CodebuffPgDatabase
183179
userId: string
184180
apiKey: string
181+
logger: Logger
185182
}): Promise<CachedComposioSession> {
186183
const composio = getComposioClient(params.apiKey)
187184
const session = await composio.create(params.userId)
188-
await saveSession({
185+
await insertSessionIfAbsent({
189186
db: params.db,
190187
userId: params.userId,
191188
sessionId: session.sessionId,
192189
})
193190

191+
const storedSession = await getStoredSessionByUser({
192+
db: params.db,
193+
userId: params.userId,
194+
})
195+
if (!storedSession) {
196+
throw new Error('Failed to persist Composio session')
197+
}
198+
199+
if (storedSession.session_id !== session.sessionId) {
200+
params.logger.info(
201+
{
202+
userId: params.userId,
203+
createdSessionId: session.sessionId,
204+
storedSessionId: storedSession.session_id,
205+
},
206+
'Using existing persisted Composio session after concurrent creation',
207+
)
208+
return rehydrateSession({
209+
userId: params.userId,
210+
sessionId: storedSession.session_id,
211+
apiKey: params.apiKey,
212+
includeTools: true,
213+
})
214+
}
215+
194216
const cachedSession: CachedComposioSession = {
195217
userId: params.userId,
196218
sessionId: session.sessionId,
@@ -322,6 +344,7 @@ async function getSessionForUser(params: {
322344
db: params.db,
323345
userId: params.userId,
324346
apiKey,
347+
logger: params.logger,
325348
})
326349
} catch (error) {
327350
params.logger.error(

0 commit comments

Comments
 (0)