11import { db } from '@sim/db'
2+ import type { WorkspaceMode } from '@sim/db/schema'
23import { organization , workspace } from '@sim/db/schema'
34import { createLogger } from '@sim/logger'
45import { tasks } from '@trigger.dev/sdk'
56import { eq , isNull } from 'drizzle-orm'
7+ import { getOrganizationSubscription } from '@/lib/billing/core/billing'
68import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
79import { getPlanType , type PlanCategory } from '@/lib/billing/plan-helpers'
810import { getJobQueue } from '@/lib/core/async-jobs'
911import { shouldExecuteInline } from '@/lib/core/async-jobs/config'
1012import type { EnqueueOptions } from '@/lib/core/async-jobs/types'
1113import { isTriggerAvailable } from '@/lib/knowledge/documents/service'
14+ import { isOrganizationWorkspace } from '@/lib/workspaces/policy'
1215
1316const logger = createLogger ( 'RetentionDispatcher' )
1417
@@ -41,11 +44,15 @@ interface CleanupJobConfig {
4144interface WorkspaceCleanupScopeRow {
4245 id : string
4346 billedAccountUserId : string
47+ organizationId : string | null
48+ workspaceMode : WorkspaceMode
4449 organizationSettings : OrganizationRetentionSettings | null
4550}
4651
4752const DAY = 24
4853
54+ type PlanResolutionEntry = readonly [ string , PlanCategory ]
55+
4956/**
5057 * Single source of truth for cleanup retention: which key each job type reads
5158 * from `organization.dataRetentionSettings`, and the default retention (in
@@ -72,6 +79,8 @@ async function listActiveWorkspaceCleanupScopeRows(): Promise<WorkspaceCleanupSc
7279 . select ( {
7380 id : workspace . id ,
7481 billedAccountUserId : workspace . billedAccountUserId ,
82+ organizationId : workspace . organizationId ,
83+ workspaceMode : workspace . workspaceMode ,
7584 organizationSettings : organization . dataRetentionSettings ,
7685 } )
7786 . from ( workspace )
@@ -91,12 +100,56 @@ async function resolvePlanTypesByBilledUserId(
91100 const billedUserIds = Array . from ( new Set ( rows . map ( ( row ) => row . billedAccountUserId ) ) )
92101 const entries = await Promise . all (
93102 billedUserIds . map ( async ( userId ) => {
94- const subscription = await getHighestPrioritySubscription ( userId , { onError : 'throw' } )
95- return [ userId , getPlanType ( subscription ?. plan ) ] as const
103+ try {
104+ const subscription = await getHighestPrioritySubscription ( userId , { onError : 'throw' } )
105+ return [ userId , getPlanType ( subscription ?. plan ) ] as const
106+ } catch ( error ) {
107+ logger . error ( 'Skipping cleanup for billed user after plan lookup failed' , {
108+ userId,
109+ error,
110+ } )
111+ return null
112+ }
113+ } )
114+ )
115+
116+ return new Map ( entries . filter ( ( entry ) : entry is PlanResolutionEntry => entry !== null ) )
117+ }
118+
119+ async function resolvePlanTypesByWorkspaceId (
120+ rows : WorkspaceCleanupScopeRow [ ]
121+ ) : Promise < Map < string , PlanCategory > > {
122+ const userScopedRows = rows . filter ( ( row ) => ! isOrganizationWorkspace ( row ) )
123+ const userPlanByBilledUserId = await resolvePlanTypesByBilledUserId ( userScopedRows )
124+ const entries = await Promise . all (
125+ rows . map ( async ( row ) => {
126+ const organizationId = isOrganizationWorkspace ( row ) ? row . organizationId : null
127+ if ( organizationId ) {
128+ try {
129+ const subscription = await getOrganizationSubscription ( organizationId , {
130+ onError : 'throw' ,
131+ } )
132+ return [ row . id , getPlanType ( subscription ?. plan ) ] as const
133+ } catch ( error ) {
134+ logger . error ( 'Skipping cleanup for organization workspace after plan lookup failed' , {
135+ workspaceId : row . id ,
136+ organizationId,
137+ error,
138+ } )
139+ return null
140+ }
141+ }
142+
143+ const plan = userPlanByBilledUserId . get ( row . billedAccountUserId )
144+ if ( plan === undefined ) {
145+ return null
146+ }
147+
148+ return [ row . id , plan ] as const
96149 } )
97150 )
98151
99- return new Map ( entries )
152+ return new Map ( entries . filter ( ( entry ) : entry is PlanResolutionEntry => entry !== null ) )
100153}
101154
102155/**
@@ -107,10 +160,8 @@ async function resolvePlanTypesByBilledUserId(
107160 */
108161async function resolveWorkspaceIdsForPlan ( plan : NonEnterprisePlan ) : Promise < string [ ] > {
109162 const rows = await listActiveWorkspaceCleanupScopeRows ( )
110- const planByBilledUserId = await resolvePlanTypesByBilledUserId ( rows )
111- return rows
112- . filter ( ( row ) => planByBilledUserId . get ( row . billedAccountUserId ) === plan )
113- . map ( ( row ) => row . id )
163+ const planByWorkspaceId = await resolvePlanTypesByWorkspaceId ( rows )
164+ return rows . filter ( ( row ) => planByWorkspaceId . get ( row . id ) === plan ) . map ( ( row ) => row . id )
114165}
115166
116167export interface ResolvedCleanupScope {
@@ -198,10 +249,10 @@ export async function dispatchCleanupJobs(
198249 }
199250
200251 const activeWorkspaceRows = await listActiveWorkspaceCleanupScopeRows ( )
201- const planByBilledUserId = await resolvePlanTypesByBilledUserId ( activeWorkspaceRows )
252+ const planByWorkspaceId = await resolvePlanTypesByWorkspaceId ( activeWorkspaceRows )
202253 const enterpriseRows = activeWorkspaceRows . filter (
203254 ( row ) =>
204- planByBilledUserId . get ( row . billedAccountUserId ) === 'enterprise' &&
255+ planByWorkspaceId . get ( row . id ) === 'enterprise' &&
205256 row . organizationSettings ?. [ config . key ] != null
206257 )
207258
0 commit comments