Skip to content

Commit 467130b

Browse files
committed
Recover invalid Composio sessions
1 parent 3b8ea61 commit 467130b

2 files changed

Lines changed: 220 additions & 6 deletions

File tree

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { beforeAll, beforeEach, describe, expect, mock, test } from 'bun:test'
2+
3+
import type { Logger } from '@codebuff/common/types/contracts/logger'
4+
import type { getComposioToolsForUser as GetComposioToolsForUser } from '../composio'
5+
6+
let getComposioToolsForUser: typeof GetComposioToolsForUser
7+
8+
let createSession: ReturnType<typeof mock>
9+
let useSession: ReturnType<typeof mock>
10+
let getRawToolRouterSessionTools: ReturnType<typeof mock>
11+
12+
beforeAll(async () => {
13+
mock.module('server-only', () => ({}))
14+
mock.module('@codebuff/internal/env', () => ({
15+
env: { COMPOSIO_API_KEY: 'test-composio-api-key' },
16+
}))
17+
mock.module('@composio/core', () => ({
18+
Composio: class {
19+
tools = {
20+
getRawToolRouterSessionTools,
21+
}
22+
23+
create = createSession
24+
use = useSession
25+
},
26+
}))
27+
;({ getComposioToolsForUser } = await import('../composio'))
28+
})
29+
30+
describe('getComposioToolsForUser', () => {
31+
let logger: Logger
32+
33+
beforeEach(() => {
34+
logger = {
35+
error: mock(() => {}),
36+
warn: mock(() => {}),
37+
info: mock(() => {}),
38+
debug: mock(() => {}),
39+
}
40+
createSession = mock(async () => ({ sessionId: 'fresh-session' }))
41+
useSession = mock(async () => ({ sessionId: 'stored-session' }))
42+
getRawToolRouterSessionTools = mock(async () => [
43+
{
44+
slug: 'COMPOSIO_SEARCH_TOOLS',
45+
inputParameters: { type: 'object', properties: {} },
46+
description: 'Search tools',
47+
},
48+
])
49+
})
50+
51+
function makeDb(storedSessionId: string | null) {
52+
const findFirst = mock(async () =>
53+
storedSessionId
54+
? {
55+
user_id: 'user-123',
56+
session_id: storedSessionId,
57+
created_at: new Date(),
58+
updated_at: new Date(),
59+
}
60+
: null,
61+
)
62+
const onConflictDoUpdate = mock(async () => undefined)
63+
const values = mock(() => ({ onConflictDoUpdate }))
64+
const whereDelete = mock(async () => undefined)
65+
66+
return {
67+
db: {
68+
query: {
69+
composioSession: {
70+
findFirst,
71+
},
72+
},
73+
insert: mock(() => ({ values })),
74+
delete: mock(() => ({ where: whereDelete })),
75+
} as any,
76+
findFirst,
77+
onConflictDoUpdate,
78+
values,
79+
whereDelete,
80+
}
81+
}
82+
83+
test('replaces a stored session when Composio can no longer rehydrate it', async () => {
84+
const notFound = Object.assign(new Error('Composio session not found'), {
85+
status: 404,
86+
})
87+
useSession = mock(async () => {
88+
throw notFound
89+
})
90+
const { db, whereDelete, values } = makeDb('stored-session')
91+
92+
const result = await getComposioToolsForUser({
93+
db,
94+
userId: 'user-123',
95+
logger,
96+
})
97+
98+
expect(result).toEqual({
99+
sessionId: 'fresh-session',
100+
tools: [
101+
{
102+
toolName: 'COMPOSIO_SEARCH_TOOLS',
103+
inputSchema: { type: 'object', properties: {} },
104+
description: 'Search tools',
105+
},
106+
],
107+
})
108+
expect(useSession).toHaveBeenCalledWith('stored-session')
109+
expect(whereDelete).toHaveBeenCalledTimes(1)
110+
expect(createSession).toHaveBeenCalledWith('user-123')
111+
expect(values).toHaveBeenCalledWith({
112+
user_id: 'user-123',
113+
session_id: 'fresh-session',
114+
})
115+
})
116+
117+
test('keeps the stored session row when rehydration fails transiently', async () => {
118+
const transientError = Object.assign(new Error('Composio unavailable'), {
119+
status: 502,
120+
})
121+
useSession = mock(async () => {
122+
throw transientError
123+
})
124+
const { db, whereDelete } = makeDb('stored-session')
125+
126+
await expect(
127+
getComposioToolsForUser({
128+
db,
129+
userId: 'user-123',
130+
logger,
131+
}),
132+
).rejects.toThrow('Composio unavailable')
133+
134+
expect(whereDelete).not.toHaveBeenCalled()
135+
expect(createSession).not.toHaveBeenCalled()
136+
})
137+
})

web/src/server/composio.ts

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,21 @@ async function getStoredSessionById(params: {
163163
})
164164
}
165165

166+
async function deleteStoredSession(params: {
167+
db: CodebuffPgDatabase
168+
userId: string
169+
sessionId: string
170+
}) {
171+
await params.db
172+
.delete(schema.composioSession)
173+
.where(
174+
and(
175+
eq(schema.composioSession.user_id, params.userId),
176+
eq(schema.composioSession.session_id, params.sessionId),
177+
),
178+
)
179+
}
180+
166181
async function createSessionForUser(params: {
167182
db: CodebuffPgDatabase
168183
userId: string
@@ -188,6 +203,48 @@ async function createSessionForUser(params: {
188203
return cachedSession
189204
}
190205

206+
function getErrorStatus(error: unknown): number | undefined {
207+
if (!error || typeof error !== 'object') return undefined
208+
209+
const candidates = [
210+
'status',
211+
'statusCode',
212+
'code',
213+
'responseStatus',
214+
'httpStatus',
215+
]
216+
for (const key of candidates) {
217+
const value = (error as Record<string, unknown>)[key]
218+
if (typeof value === 'number') return value
219+
if (typeof value === 'string' && /^\d+$/.test(value)) {
220+
return Number(value)
221+
}
222+
}
223+
224+
const response = (error as Record<string, unknown>)['response']
225+
return getErrorStatus(response)
226+
}
227+
228+
function isInvalidStoredSessionError(error: unknown): boolean {
229+
const status = getErrorStatus(error)
230+
if (status && [400, 401, 403, 404, 410].includes(status)) {
231+
return true
232+
}
233+
234+
if (!(error instanceof Error)) return false
235+
236+
const message = error.message.toLowerCase()
237+
return (
238+
message.includes('session') &&
239+
(message.includes('not found') ||
240+
message.includes('not exist') ||
241+
message.includes('invalid') ||
242+
message.includes('expired') ||
243+
message.includes('unauthorized') ||
244+
message.includes('forbidden'))
245+
)
246+
}
247+
191248
async function rehydrateSession(params: {
192249
userId: string
193250
sessionId: string
@@ -228,12 +285,32 @@ async function getSessionForUser(params: {
228285
{ userId: params.userId },
229286
'Rehydrating Composio session from database',
230287
)
231-
return rehydrateSession({
232-
userId: params.userId,
233-
sessionId: storedSession.session_id,
234-
apiKey,
235-
includeTools: true,
236-
})
288+
try {
289+
return await rehydrateSession({
290+
userId: params.userId,
291+
sessionId: storedSession.session_id,
292+
apiKey,
293+
includeTools: true,
294+
})
295+
} catch (error) {
296+
if (!isInvalidStoredSessionError(error)) {
297+
throw error
298+
}
299+
300+
params.logger.warn(
301+
{
302+
error: getErrorObject(error),
303+
userId: params.userId,
304+
sessionId: storedSession.session_id,
305+
},
306+
'Stored Composio session is invalid; replacing it',
307+
)
308+
await deleteStoredSession({
309+
db: params.db,
310+
userId: params.userId,
311+
sessionId: storedSession.session_id,
312+
})
313+
}
237314
}
238315

239316
params.logger.info(

0 commit comments

Comments
 (0)