@@ -4,13 +4,20 @@ import { account, credential, credentialSetMember } from '@sim/db/schema'
44import { createLogger } from '@sim/logger'
55import { toError } from '@sim/utils/errors'
66import { and , desc , eq , inArray } from 'drizzle-orm'
7+ import { withLeaderLock } from '@/lib/concurrency/leader-lock'
8+ import { coalesceLocally } from '@/lib/concurrency/singleflight'
79import { decryptSecret } from '@/lib/core/security/encryption'
810import { refreshOAuthToken } from '@/lib/oauth'
911import {
1012 getMicrosoftRefreshTokenExpiry ,
1113 isMicrosoftProvider ,
1214 PROACTIVE_REFRESH_THRESHOLD_DAYS ,
1315} from '@/lib/oauth/microsoft'
16+ import {
17+ getRecentTerminalError ,
18+ isTerminalRefreshError ,
19+ markCredentialDead ,
20+ } from '@/lib/oauth/terminal-errors'
1421import {
1522 ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID ,
1623 ATLASSIAN_SERVICE_ACCOUNT_SECRET_TYPE ,
@@ -318,6 +325,92 @@ export async function getCredential(requestId: string, credentialId: string, use
318325 return getCredentialByAccountId ( requestId , resolved . accountId , userId )
319326}
320327
328+ interface CoalescedRefreshOptions {
329+ accountId : string
330+ providerId : string
331+ refreshToken : string
332+ requestId ?: string
333+ userId ?: string
334+ }
335+
336+ async function performCoalescedRefresh ( {
337+ accountId,
338+ providerId,
339+ refreshToken,
340+ requestId,
341+ userId,
342+ } : CoalescedRefreshOptions ) : Promise < string | null > {
343+ const logContext = {
344+ ...( requestId ? { requestId } : { } ) ,
345+ ...( userId ? { userId } : { } ) ,
346+ providerId,
347+ accountId,
348+ }
349+
350+ const deadCode = await getRecentTerminalError ( accountId )
351+ if ( deadCode ) {
352+ logger . warn ( 'Skipping refresh: credential recently failed' , {
353+ ...logContext ,
354+ errorCode : deadCode ,
355+ } )
356+ return null
357+ }
358+
359+ const lockKey = `oauth:refresh:${ accountId } `
360+
361+ return coalesceLocally ( lockKey , ( ) =>
362+ withLeaderLock < string > ( {
363+ key : lockKey ,
364+ onLeader : async ( ) => {
365+ const result = await refreshOAuthToken ( providerId , refreshToken )
366+
367+ if ( ! result . ok ) {
368+ logger . error ( 'Failed to refresh token' , {
369+ ...logContext ,
370+ errorCode : result . errorCode ,
371+ } )
372+ if ( result . errorCode && isTerminalRefreshError ( result . errorCode ) ) {
373+ await markCredentialDead ( accountId , result . errorCode )
374+ }
375+ return null
376+ }
377+
378+ const updateData : Record < string , unknown > = {
379+ accessToken : result . accessToken ,
380+ accessTokenExpiresAt : new Date ( Date . now ( ) + result . expiresIn * 1000 ) ,
381+ updatedAt : new Date ( ) ,
382+ }
383+ if ( result . refreshToken && result . refreshToken !== refreshToken ) {
384+ updateData . refreshToken = result . refreshToken
385+ }
386+ if ( isMicrosoftProvider ( providerId ) ) {
387+ updateData . refreshTokenExpiresAt = getMicrosoftRefreshTokenExpiry ( )
388+ }
389+
390+ await db . update ( account ) . set ( updateData ) . where ( eq ( account . id , accountId ) )
391+
392+ logger . info ( 'Successfully refreshed access token' , logContext )
393+ return result . accessToken
394+ } ,
395+ onFollower : async ( ) => {
396+ const [ row ] = await db
397+ . select ( {
398+ accessToken : account . accessToken ,
399+ accessTokenExpiresAt : account . accessTokenExpiresAt ,
400+ } )
401+ . from ( account )
402+ . where ( eq ( account . id , accountId ) )
403+ . limit ( 1 )
404+ if ( row ?. accessToken && row . accessTokenExpiresAt && row . accessTokenExpiresAt > new Date ( ) ) {
405+ logger . info ( 'Got fresh access token from coalesced refresh' , logContext )
406+ return row . accessToken
407+ }
408+ return null
409+ } ,
410+ } )
411+ )
412+ }
413+
321414export async function getOAuthToken ( userId : string , providerId : string ) : Promise < string | null > {
322415 const connections = await db
323416 . select ( {
@@ -347,52 +440,12 @@ export async function getOAuthToken(userId: string, providerId: string): Promise
347440 ! ! credential . refreshToken && ( ! credential . accessToken || ( tokenExpiry && tokenExpiry < now ) )
348441
349442 if ( shouldAttemptRefresh ) {
350- logger . info (
351- `Access token expired for user ${ userId } , provider ${ providerId } . Attempting to refresh.`
352- )
353-
354- try {
355- // Use the existing refreshOAuthToken function
356- const refreshResult = await refreshOAuthToken ( providerId , credential . refreshToken ! )
357-
358- if ( ! refreshResult ) {
359- logger . error ( `Failed to refresh token for user ${ userId } , provider ${ providerId } ` , {
360- providerId,
361- userId,
362- hasRefreshToken : ! ! credential . refreshToken ,
363- } )
364- return null
365- }
366-
367- const { accessToken, expiresIn, refreshToken : newRefreshToken } = refreshResult
368-
369- // Update the database with new tokens
370- const updateData : any = {
371- accessToken,
372- accessTokenExpiresAt : new Date ( Date . now ( ) + expiresIn * 1000 ) , // Convert seconds to milliseconds
373- updatedAt : new Date ( ) ,
374- }
375-
376- // If we received a new refresh token (some providers like Airtable rotate them), save it
377- if ( newRefreshToken && newRefreshToken !== credential . refreshToken ) {
378- logger . info ( `Updating refresh token for user ${ userId } , provider ${ providerId } ` )
379- updateData . refreshToken = newRefreshToken
380- }
381-
382- // Update the token in the database with the actual expiration time from the provider
383- await db . update ( account ) . set ( updateData ) . where ( eq ( account . id , credential . id ) )
384-
385- logger . info ( `Successfully refreshed token for user ${ userId } , provider ${ providerId } ` )
386- return accessToken
387- } catch ( error ) {
388- logger . error ( `Error refreshing token for user ${ userId } , provider ${ providerId } ` , {
389- error : toError ( error ) . message ,
390- stack : error instanceof Error ? error . stack : undefined ,
391- providerId,
392- userId,
393- } )
394- return null
395- }
443+ return performCoalescedRefresh ( {
444+ accountId : credential . id ,
445+ providerId,
446+ refreshToken : credential . refreshToken ! ,
447+ userId,
448+ } )
396449 }
397450
398451 if ( ! credential . accessToken ) {
@@ -472,66 +525,27 @@ export async function refreshAccessTokenIfNeeded(
472525 const accessToken = credential . accessToken
473526
474527 if ( shouldRefresh ) {
475- logger . info ( `[${ requestId } ] Refreshing token for credential` )
476- try {
477- const refreshedToken = await refreshOAuthToken (
478- credential . providerId ,
479- credential . refreshToken !
480- )
481-
482- if ( ! refreshedToken ) {
483- logger . error ( `[${ requestId } ] Failed to refresh token for credential: ${ credentialId } ` , {
484- credentialId,
485- providerId : credential . providerId ,
486- userId : credential . userId ,
487- hasRefreshToken : ! ! credential . refreshToken ,
488- } )
489- if ( ! accessTokenNeedsRefresh && accessToken ) {
490- logger . info ( `[${ requestId } ] Proactive refresh failed but access token still valid` )
491- return accessToken
492- }
493- return null
494- }
495-
496- // Prepare update data
497- const updateData : Record < string , unknown > = {
498- accessToken : refreshedToken . accessToken ,
499- accessTokenExpiresAt : new Date ( Date . now ( ) + refreshedToken . expiresIn * 1000 ) ,
500- updatedAt : new Date ( ) ,
501- }
502-
503- // If we received a new refresh token, update it
504- if ( refreshedToken . refreshToken && refreshedToken . refreshToken !== credential . refreshToken ) {
505- logger . info ( `[${ requestId } ] Updating refresh token for credential` )
506- updateData . refreshToken = refreshedToken . refreshToken
507- }
528+ const resolvedCredentialId =
529+ ( credential as { resolvedCredentialId ?: string } ) . resolvedCredentialId ?? credentialId
508530
509- if ( isMicrosoftProvider ( credential . providerId ) ) {
510- updateData . refreshTokenExpiresAt = getMicrosoftRefreshTokenExpiry ( )
511- }
531+ const fresh = await performCoalescedRefresh ( {
532+ accountId : resolvedCredentialId ,
533+ providerId : credential . providerId ,
534+ refreshToken : credential . refreshToken ! ,
535+ requestId,
536+ userId : credential . userId ,
537+ } )
538+ if ( fresh ) return fresh
512539
513- // Update the token in the database
514- const resolvedCredentialId =
515- ( credential as { resolvedCredentialId ?: string } ) . resolvedCredentialId ?? credentialId
516- await db . update ( account ) . set ( updateData ) . where ( eq ( account . id , resolvedCredentialId ) )
517-
518- logger . info ( `[${ requestId } ] Successfully refreshed access token for credential` )
519- return refreshedToken . accessToken
520- } catch ( error ) {
521- logger . error ( `[${ requestId } ] Error refreshing token for credential` , {
522- error : toError ( error ) . message ,
523- stack : error instanceof Error ? error . stack : undefined ,
524- providerId : credential . providerId ,
525- credentialId,
526- userId : credential . userId ,
527- } )
528- if ( ! accessTokenNeedsRefresh && accessToken ) {
529- logger . info ( `[${ requestId } ] Proactive refresh failed but access token still valid` )
530- return accessToken
531- }
532- return null
540+ // If refresh was only triggered proactively (Microsoft refresh-token aging),
541+ // the still-valid access token is a fine fallback.
542+ if ( ! accessTokenNeedsRefresh && accessToken ) {
543+ logger . info ( `[${ requestId } ] Refresh unavailable; reusing still-valid access token` )
544+ return accessToken
533545 }
534- } else if ( ! accessToken ) {
546+ return null
547+ }
548+ if ( ! accessToken ) {
535549 // We have no access token and either no refresh token or not eligible to refresh
536550 logger . error ( `[${ requestId } ] Missing access token for credential` )
537551 return null
@@ -580,65 +594,20 @@ export async function refreshTokenIfNeeded(
580594 return { accessToken : credential . accessToken , refreshed : false }
581595 }
582596
583- try {
584- const refreshResult = await refreshOAuthToken ( credential . providerId , credential . refreshToken ! )
585-
586- if ( ! refreshResult ) {
587- logger . error ( `[${ requestId } ] Failed to refresh token for credential` )
588- if ( ! accessTokenNeedsRefresh && credential . accessToken ) {
589- logger . info ( `[${ requestId } ] Proactive refresh failed but access token still valid` )
590- return { accessToken : credential . accessToken , refreshed : false }
591- }
592- throw new Error ( 'Failed to refresh token' )
593- }
594-
595- const { accessToken : refreshedToken , expiresIn, refreshToken : newRefreshToken } = refreshResult
596-
597- // Prepare update data
598- const updateData : Record < string , unknown > = {
599- accessToken : refreshedToken ,
600- accessTokenExpiresAt : new Date ( Date . now ( ) + expiresIn * 1000 ) , // Use provider's expiry
601- updatedAt : new Date ( ) ,
602- }
603-
604- // If we received a new refresh token, update it
605- if ( newRefreshToken && newRefreshToken !== credential . refreshToken ) {
606- logger . info ( `[${ requestId } ] Updating refresh token` )
607- updateData . refreshToken = newRefreshToken
608- }
609-
610- if ( isMicrosoftProvider ( credential . providerId ) ) {
611- updateData . refreshTokenExpiresAt = getMicrosoftRefreshTokenExpiry ( )
612- }
613-
614- await db . update ( account ) . set ( updateData ) . where ( eq ( account . id , resolvedCredentialId ) )
615-
616- logger . info ( `[${ requestId } ] Successfully refreshed access token` )
617- return { accessToken : refreshedToken , refreshed : true }
618- } catch ( error ) {
619- logger . warn (
620- `[${ requestId } ] Refresh attempt failed, checking if another concurrent request succeeded`
621- )
622-
623- const freshCredential = await getCredential ( requestId , resolvedCredentialId , credential . userId )
624- if ( freshCredential ?. accessToken ) {
625- const freshExpiresAt = freshCredential . accessTokenExpiresAt
626- const stillValid = ! freshExpiresAt || freshExpiresAt > new Date ( )
627-
628- if ( stillValid ) {
629- logger . info ( `[${ requestId } ] Found valid token from concurrent refresh, using it` )
630- return { accessToken : freshCredential . accessToken , refreshed : true }
631- }
632- }
633-
634- if ( ! accessTokenNeedsRefresh && credential . accessToken ) {
635- logger . info ( `[${ requestId } ] Proactive refresh failed but access token still valid` )
636- return { accessToken : credential . accessToken , refreshed : false }
637- }
597+ const fresh = await performCoalescedRefresh ( {
598+ accountId : resolvedCredentialId ,
599+ providerId : credential . providerId ,
600+ refreshToken : credential . refreshToken ! ,
601+ requestId,
602+ userId : credential . userId ,
603+ } )
604+ if ( fresh ) return { accessToken : fresh , refreshed : true }
638605
639- logger . error ( `[${ requestId } ] Refresh failed and no valid token found in DB` , error )
640- throw error
606+ if ( ! accessTokenNeedsRefresh && credential . accessToken ) {
607+ logger . info ( `[${ requestId } ] Refresh unavailable; reusing still-valid access token` )
608+ return { accessToken : credential . accessToken , refreshed : false }
641609 }
610+ throw new Error ( 'Failed to refresh token' )
642611}
643612
644613export interface CredentialSetCredential {
@@ -704,7 +673,7 @@ export async function getCredentialsForCredentialSet(
704673 try {
705674 const refreshResult = await refreshOAuthToken ( providerId , cred . refreshToken )
706675
707- if ( refreshResult ) {
676+ if ( refreshResult . ok ) {
708677 accessToken = refreshResult . accessToken
709678
710679 const updateData : Record < string , unknown > = {
@@ -720,6 +689,10 @@ export async function getCredentialsForCredentialSet(
720689 await db . update ( account ) . set ( updateData ) . where ( eq ( account . id , cred . id ) )
721690
722691 logger . info ( `Refreshed token for user ${ cred . userId } , provider ${ providerId } ` )
692+ } else {
693+ logger . warn ( `Refresh failed for user ${ cred . userId } , provider ${ providerId } ` , {
694+ errorCode : refreshResult . errorCode ,
695+ } )
723696 }
724697 } catch ( error ) {
725698 logger . error ( `Failed to refresh token for user ${ cred . userId } , provider ${ providerId } ` , {
0 commit comments