Skip to content

Commit 9a07693

Browse files
committed
improvement(oauth): coalesce token refresh + scrub unknown credential refs
1 parent 3f7698c commit 9a07693

17 files changed

Lines changed: 1054 additions & 188 deletions

File tree

apps/realtime/src/handlers/subblocks.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
44
import { SUBBLOCK_OPERATIONS } from '@sim/realtime-protocol/constants'
55
import { getErrorMessage } from '@sim/utils/errors'
66
import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz'
7+
import { scrubSingleCredentialValue } from '@sim/workflow-persistence/credential-scrub'
78
import { isWorkflowBlockProtected } from '@sim/workflow-types/workflow'
89
import { and, eq } from 'drizzle-orm'
910
import type { AuthenticatedSocket } from '@/middleware/auth'
@@ -234,8 +235,19 @@ async function flushSubblockUpdate(
234235
pending: PendingSubblock,
235236
roomManager: IRoomManager
236237
) {
237-
const { blockId, subblockId, value, timestamp } = pending.latest
238+
const { blockId, subblockId, value: incomingValue, timestamp } = pending.latest
238239
const io = roomManager.io
240+
const { value, cleared: credentialCleared } = await scrubSingleCredentialValue(
241+
subblockId,
242+
incomingValue
243+
)
244+
if (credentialCleared) {
245+
logger.warn('Cleared dangling credential ref in realtime subblock update', {
246+
workflowId,
247+
blockId,
248+
subblockId,
249+
})
250+
}
239251

240252
try {
241253
// Verify workflow still exists

apps/sim/app/api/auth/oauth/utils.ts

Lines changed: 134 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,20 @@ import { account, credential, credentialSetMember } from '@sim/db/schema'
44
import { createLogger } from '@sim/logger'
55
import { toError } from '@sim/utils/errors'
66
import { and, desc, eq, inArray } from 'drizzle-orm'
7+
import { withLeaderLock } from '@/lib/concurrency/leader-lock'
8+
import { coalesceLocally } from '@/lib/concurrency/singleflight'
79
import { decryptSecret } from '@/lib/core/security/encryption'
810
import { refreshOAuthToken } from '@/lib/oauth'
911
import {
1012
getMicrosoftRefreshTokenExpiry,
1113
isMicrosoftProvider,
1214
PROACTIVE_REFRESH_THRESHOLD_DAYS,
1315
} from '@/lib/oauth/microsoft'
16+
import {
17+
getRecentTerminalError,
18+
isTerminalRefreshError,
19+
markCredentialDead,
20+
} from '@/lib/oauth/terminal-errors'
1421
import {
1522
ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID,
1623
ATLASSIAN_SERVICE_ACCOUNT_SECRET_TYPE,
@@ -318,6 +325,92 @@ export async function getCredential(requestId: string, credentialId: string, use
318325
return getCredentialByAccountId(requestId, resolved.accountId, userId)
319326
}
320327

328+
interface CoalescedRefreshOptions {
329+
accountId: string
330+
providerId: string
331+
refreshToken: string
332+
requestId?: string
333+
userId?: string
334+
}
335+
336+
async function performCoalescedRefresh({
337+
accountId,
338+
providerId,
339+
refreshToken,
340+
requestId,
341+
userId,
342+
}: CoalescedRefreshOptions): Promise<string | null> {
343+
const logContext = {
344+
...(requestId ? { requestId } : {}),
345+
...(userId ? { userId } : {}),
346+
providerId,
347+
accountId,
348+
}
349+
350+
const deadCode = await getRecentTerminalError(accountId)
351+
if (deadCode) {
352+
logger.warn('Skipping refresh: credential recently failed', {
353+
...logContext,
354+
errorCode: deadCode,
355+
})
356+
return null
357+
}
358+
359+
const lockKey = `oauth:refresh:${accountId}`
360+
361+
return coalesceLocally(lockKey, () =>
362+
withLeaderLock<string>({
363+
key: lockKey,
364+
onLeader: async () => {
365+
const result = await refreshOAuthToken(providerId, refreshToken)
366+
367+
if (!result.ok) {
368+
logger.error('Failed to refresh token', {
369+
...logContext,
370+
errorCode: result.errorCode,
371+
})
372+
if (result.errorCode && isTerminalRefreshError(result.errorCode)) {
373+
await markCredentialDead(accountId, result.errorCode)
374+
}
375+
return null
376+
}
377+
378+
const updateData: Record<string, unknown> = {
379+
accessToken: result.accessToken,
380+
accessTokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
381+
updatedAt: new Date(),
382+
}
383+
if (result.refreshToken && result.refreshToken !== refreshToken) {
384+
updateData.refreshToken = result.refreshToken
385+
}
386+
if (isMicrosoftProvider(providerId)) {
387+
updateData.refreshTokenExpiresAt = getMicrosoftRefreshTokenExpiry()
388+
}
389+
390+
await db.update(account).set(updateData).where(eq(account.id, accountId))
391+
392+
logger.info('Successfully refreshed access token', logContext)
393+
return result.accessToken
394+
},
395+
onFollower: async () => {
396+
const [row] = await db
397+
.select({
398+
accessToken: account.accessToken,
399+
accessTokenExpiresAt: account.accessTokenExpiresAt,
400+
})
401+
.from(account)
402+
.where(eq(account.id, accountId))
403+
.limit(1)
404+
if (row?.accessToken && row.accessTokenExpiresAt && row.accessTokenExpiresAt > new Date()) {
405+
logger.info('Got fresh access token from coalesced refresh', logContext)
406+
return row.accessToken
407+
}
408+
return null
409+
},
410+
})
411+
)
412+
}
413+
321414
export async function getOAuthToken(userId: string, providerId: string): Promise<string | null> {
322415
const connections = await db
323416
.select({
@@ -347,52 +440,12 @@ export async function getOAuthToken(userId: string, providerId: string): Promise
347440
!!credential.refreshToken && (!credential.accessToken || (tokenExpiry && tokenExpiry < now))
348441

349442
if (shouldAttemptRefresh) {
350-
logger.info(
351-
`Access token expired for user ${userId}, provider ${providerId}. Attempting to refresh.`
352-
)
353-
354-
try {
355-
// Use the existing refreshOAuthToken function
356-
const refreshResult = await refreshOAuthToken(providerId, credential.refreshToken!)
357-
358-
if (!refreshResult) {
359-
logger.error(`Failed to refresh token for user ${userId}, provider ${providerId}`, {
360-
providerId,
361-
userId,
362-
hasRefreshToken: !!credential.refreshToken,
363-
})
364-
return null
365-
}
366-
367-
const { accessToken, expiresIn, refreshToken: newRefreshToken } = refreshResult
368-
369-
// Update the database with new tokens
370-
const updateData: any = {
371-
accessToken,
372-
accessTokenExpiresAt: new Date(Date.now() + expiresIn * 1000), // Convert seconds to milliseconds
373-
updatedAt: new Date(),
374-
}
375-
376-
// If we received a new refresh token (some providers like Airtable rotate them), save it
377-
if (newRefreshToken && newRefreshToken !== credential.refreshToken) {
378-
logger.info(`Updating refresh token for user ${userId}, provider ${providerId}`)
379-
updateData.refreshToken = newRefreshToken
380-
}
381-
382-
// Update the token in the database with the actual expiration time from the provider
383-
await db.update(account).set(updateData).where(eq(account.id, credential.id))
384-
385-
logger.info(`Successfully refreshed token for user ${userId}, provider ${providerId}`)
386-
return accessToken
387-
} catch (error) {
388-
logger.error(`Error refreshing token for user ${userId}, provider ${providerId}`, {
389-
error: toError(error).message,
390-
stack: error instanceof Error ? error.stack : undefined,
391-
providerId,
392-
userId,
393-
})
394-
return null
395-
}
443+
return performCoalescedRefresh({
444+
accountId: credential.id,
445+
providerId,
446+
refreshToken: credential.refreshToken!,
447+
userId,
448+
})
396449
}
397450

398451
if (!credential.accessToken) {
@@ -472,66 +525,27 @@ export async function refreshAccessTokenIfNeeded(
472525
const accessToken = credential.accessToken
473526

474527
if (shouldRefresh) {
475-
logger.info(`[${requestId}] Refreshing token for credential`)
476-
try {
477-
const refreshedToken = await refreshOAuthToken(
478-
credential.providerId,
479-
credential.refreshToken!
480-
)
481-
482-
if (!refreshedToken) {
483-
logger.error(`[${requestId}] Failed to refresh token for credential: ${credentialId}`, {
484-
credentialId,
485-
providerId: credential.providerId,
486-
userId: credential.userId,
487-
hasRefreshToken: !!credential.refreshToken,
488-
})
489-
if (!accessTokenNeedsRefresh && accessToken) {
490-
logger.info(`[${requestId}] Proactive refresh failed but access token still valid`)
491-
return accessToken
492-
}
493-
return null
494-
}
495-
496-
// Prepare update data
497-
const updateData: Record<string, unknown> = {
498-
accessToken: refreshedToken.accessToken,
499-
accessTokenExpiresAt: new Date(Date.now() + refreshedToken.expiresIn * 1000),
500-
updatedAt: new Date(),
501-
}
502-
503-
// If we received a new refresh token, update it
504-
if (refreshedToken.refreshToken && refreshedToken.refreshToken !== credential.refreshToken) {
505-
logger.info(`[${requestId}] Updating refresh token for credential`)
506-
updateData.refreshToken = refreshedToken.refreshToken
507-
}
528+
const resolvedCredentialId =
529+
(credential as { resolvedCredentialId?: string }).resolvedCredentialId ?? credentialId
508530

509-
if (isMicrosoftProvider(credential.providerId)) {
510-
updateData.refreshTokenExpiresAt = getMicrosoftRefreshTokenExpiry()
511-
}
531+
const fresh = await performCoalescedRefresh({
532+
accountId: resolvedCredentialId,
533+
providerId: credential.providerId,
534+
refreshToken: credential.refreshToken!,
535+
requestId,
536+
userId: credential.userId,
537+
})
538+
if (fresh) return fresh
512539

513-
// Update the token in the database
514-
const resolvedCredentialId =
515-
(credential as { resolvedCredentialId?: string }).resolvedCredentialId ?? credentialId
516-
await db.update(account).set(updateData).where(eq(account.id, resolvedCredentialId))
517-
518-
logger.info(`[${requestId}] Successfully refreshed access token for credential`)
519-
return refreshedToken.accessToken
520-
} catch (error) {
521-
logger.error(`[${requestId}] Error refreshing token for credential`, {
522-
error: toError(error).message,
523-
stack: error instanceof Error ? error.stack : undefined,
524-
providerId: credential.providerId,
525-
credentialId,
526-
userId: credential.userId,
527-
})
528-
if (!accessTokenNeedsRefresh && accessToken) {
529-
logger.info(`[${requestId}] Proactive refresh failed but access token still valid`)
530-
return accessToken
531-
}
532-
return null
540+
// If refresh was only triggered proactively (Microsoft refresh-token aging),
541+
// the still-valid access token is a fine fallback.
542+
if (!accessTokenNeedsRefresh && accessToken) {
543+
logger.info(`[${requestId}] Refresh unavailable; reusing still-valid access token`)
544+
return accessToken
533545
}
534-
} else if (!accessToken) {
546+
return null
547+
}
548+
if (!accessToken) {
535549
// We have no access token and either no refresh token or not eligible to refresh
536550
logger.error(`[${requestId}] Missing access token for credential`)
537551
return null
@@ -580,65 +594,20 @@ export async function refreshTokenIfNeeded(
580594
return { accessToken: credential.accessToken, refreshed: false }
581595
}
582596

583-
try {
584-
const refreshResult = await refreshOAuthToken(credential.providerId, credential.refreshToken!)
585-
586-
if (!refreshResult) {
587-
logger.error(`[${requestId}] Failed to refresh token for credential`)
588-
if (!accessTokenNeedsRefresh && credential.accessToken) {
589-
logger.info(`[${requestId}] Proactive refresh failed but access token still valid`)
590-
return { accessToken: credential.accessToken, refreshed: false }
591-
}
592-
throw new Error('Failed to refresh token')
593-
}
594-
595-
const { accessToken: refreshedToken, expiresIn, refreshToken: newRefreshToken } = refreshResult
596-
597-
// Prepare update data
598-
const updateData: Record<string, unknown> = {
599-
accessToken: refreshedToken,
600-
accessTokenExpiresAt: new Date(Date.now() + expiresIn * 1000), // Use provider's expiry
601-
updatedAt: new Date(),
602-
}
603-
604-
// If we received a new refresh token, update it
605-
if (newRefreshToken && newRefreshToken !== credential.refreshToken) {
606-
logger.info(`[${requestId}] Updating refresh token`)
607-
updateData.refreshToken = newRefreshToken
608-
}
609-
610-
if (isMicrosoftProvider(credential.providerId)) {
611-
updateData.refreshTokenExpiresAt = getMicrosoftRefreshTokenExpiry()
612-
}
613-
614-
await db.update(account).set(updateData).where(eq(account.id, resolvedCredentialId))
615-
616-
logger.info(`[${requestId}] Successfully refreshed access token`)
617-
return { accessToken: refreshedToken, refreshed: true }
618-
} catch (error) {
619-
logger.warn(
620-
`[${requestId}] Refresh attempt failed, checking if another concurrent request succeeded`
621-
)
622-
623-
const freshCredential = await getCredential(requestId, resolvedCredentialId, credential.userId)
624-
if (freshCredential?.accessToken) {
625-
const freshExpiresAt = freshCredential.accessTokenExpiresAt
626-
const stillValid = !freshExpiresAt || freshExpiresAt > new Date()
627-
628-
if (stillValid) {
629-
logger.info(`[${requestId}] Found valid token from concurrent refresh, using it`)
630-
return { accessToken: freshCredential.accessToken, refreshed: true }
631-
}
632-
}
633-
634-
if (!accessTokenNeedsRefresh && credential.accessToken) {
635-
logger.info(`[${requestId}] Proactive refresh failed but access token still valid`)
636-
return { accessToken: credential.accessToken, refreshed: false }
637-
}
597+
const fresh = await performCoalescedRefresh({
598+
accountId: resolvedCredentialId,
599+
providerId: credential.providerId,
600+
refreshToken: credential.refreshToken!,
601+
requestId,
602+
userId: credential.userId,
603+
})
604+
if (fresh) return { accessToken: fresh, refreshed: true }
638605

639-
logger.error(`[${requestId}] Refresh failed and no valid token found in DB`, error)
640-
throw error
606+
if (!accessTokenNeedsRefresh && credential.accessToken) {
607+
logger.info(`[${requestId}] Refresh unavailable; reusing still-valid access token`)
608+
return { accessToken: credential.accessToken, refreshed: false }
641609
}
610+
throw new Error('Failed to refresh token')
642611
}
643612

644613
export interface CredentialSetCredential {
@@ -704,7 +673,7 @@ export async function getCredentialsForCredentialSet(
704673
try {
705674
const refreshResult = await refreshOAuthToken(providerId, cred.refreshToken)
706675

707-
if (refreshResult) {
676+
if (refreshResult.ok) {
708677
accessToken = refreshResult.accessToken
709678

710679
const updateData: Record<string, unknown> = {
@@ -720,6 +689,10 @@ export async function getCredentialsForCredentialSet(
720689
await db.update(account).set(updateData).where(eq(account.id, cred.id))
721690

722691
logger.info(`Refreshed token for user ${cred.userId}, provider ${providerId}`)
692+
} else {
693+
logger.warn(`Refresh failed for user ${cred.userId}, provider ${providerId}`, {
694+
errorCode: refreshResult.errorCode,
695+
})
723696
}
724697
} catch (error) {
725698
logger.error(`Failed to refresh token for user ${cred.userId}, provider ${providerId}`, {

0 commit comments

Comments
 (0)