11import { db } from '@sim/db'
22import {
3+ executionLargeValueDependencies ,
4+ executionLargeValueReferences ,
35 executionLargeValues ,
46 jobExecutionLogs ,
57 pausedExecutions ,
68 workflowExecutionLogs ,
9+ workspaceFiles ,
710} from '@sim/db/schema'
811import { createLogger } from '@sim/logger'
912import { task } from '@trigger.dev/sdk'
10- import { and , asc , eq , inArray , isNull , lt , notInArray , or } from 'drizzle-orm'
13+ import { and , asc , eq , inArray , isNull , lt , notInArray , or , sql } from 'drizzle-orm'
1114import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
1215import {
1316 batchDeleteByWorkspaceAndTimestamp ,
@@ -42,6 +45,7 @@ const LOG_CLEANUP_CONCURRENCY_LIMIT = 2
4245const LARGE_VALUE_CLEANUP_BATCH_SIZE = 500
4346const LARGE_VALUE_CLEANUP_TOTAL_KEY_LIMIT = 5_000
4447const LARGE_VALUE_CLEANUP_GRACE_HOURS = 7 * 24
48+ const LEGACY_LARGE_VALUE_CLEANUP_GRACE_HOURS = 30 * 24
4549const LARGE_VALUE_TOMBSTONE_RETENTION_HOURS = 30 * 24
4650
4751async function deleteExecutionFiles ( files : unknown , stats : FileDeleteStats ) : Promise < void > {
@@ -161,7 +165,11 @@ async function cleanupLargeExecutionValues(
161165 unreferencedLargeValuePredicate ( )
162166 )
163167 )
164- . orderBy ( asc ( executionLargeValues . createdAt ) , asc ( executionLargeValues . key ) )
168+ . orderBy (
169+ asc ( executionLargeValues . workspaceId ) ,
170+ asc ( executionLargeValues . createdAt ) ,
171+ asc ( executionLargeValues . key )
172+ )
165173 . limit ( limit )
166174
167175 if ( rows . length === 0 ) break
@@ -188,6 +196,159 @@ async function cleanupLargeExecutionValues(
188196 return stats
189197}
190198
199+ async function cleanupLegacyLargeExecutionValues (
200+ workspaceIds : string [ ] ,
201+ retentionDate : Date ,
202+ label : string
203+ ) : Promise < LargeValueCleanupStats > {
204+ const stats : LargeValueCleanupStats = {
205+ largeValuesTotal : 0 ,
206+ largeValuesDeleted : 0 ,
207+ largeValuesDeleteFailed : 0 ,
208+ }
209+ if ( workspaceIds . length === 0 ) return stats
210+
211+ const legacyRetentionDate = new Date (
212+ retentionDate . getTime ( ) - LEGACY_LARGE_VALUE_CLEANUP_GRACE_HOURS * 60 * 60 * 1000
213+ )
214+ const workspaceChunks = chunkArray ( workspaceIds , 50 )
215+ let attempted = 0
216+
217+ for ( const chunkIds of workspaceChunks ) {
218+ while ( attempted < LARGE_VALUE_CLEANUP_TOTAL_KEY_LIMIT ) {
219+ const limit = Math . min (
220+ LARGE_VALUE_CLEANUP_BATCH_SIZE ,
221+ LARGE_VALUE_CLEANUP_TOTAL_KEY_LIMIT - attempted
222+ )
223+ const rows = await db
224+ . select ( { key : workspaceFiles . key } )
225+ . from ( workspaceFiles )
226+ . where (
227+ and (
228+ inArray ( workspaceFiles . workspaceId , chunkIds ) ,
229+ eq ( workspaceFiles . context , 'execution' ) ,
230+ isNull ( workspaceFiles . deletedAt ) ,
231+ lt ( workspaceFiles . uploadedAt , legacyRetentionDate ) ,
232+ sql `${ workspaceFiles . key } LIKE 'execution/%/%/%/large-value-lv_%.json'` ,
233+ sql `NOT EXISTS (
234+ SELECT 1
235+ FROM ${ executionLargeValues } AS registered_value
236+ WHERE registered_value.key = ${ workspaceFiles . key }
237+ )` ,
238+ sql `NOT EXISTS (
239+ SELECT 1
240+ FROM ${ executionLargeValueReferences } AS ref
241+ WHERE ref.key = ${ workspaceFiles . key }
242+ AND (
243+ (
244+ ref.source = 'execution_log'
245+ AND EXISTS (
246+ SELECT 1
247+ FROM ${ workflowExecutionLogs } AS ref_wel
248+ WHERE ref_wel.execution_id = ref.execution_id
249+ )
250+ )
251+ OR (
252+ ref.source = 'paused_snapshot'
253+ AND EXISTS (
254+ SELECT 1
255+ FROM ${ pausedExecutions } AS ref_pe
256+ WHERE ref_pe.execution_id = ref.execution_id
257+ AND ref_pe.status = ANY(${ RESUMABLE_PAUSED_STATUSES } ::text[])
258+ )
259+ )
260+ )
261+ )` ,
262+ sql `NOT EXISTS (
263+ SELECT 1
264+ FROM ${ executionLargeValueDependencies } AS dependency
265+ INNER JOIN ${ executionLargeValues } AS parent_value
266+ ON parent_value.key = dependency.parent_key
267+ AND parent_value.deleted_at IS NULL
268+ WHERE dependency.child_key = ${ workspaceFiles . key }
269+ AND dependency.workspace_id = ${ workspaceFiles . workspaceId }
270+ AND (
271+ EXISTS (
272+ SELECT 1
273+ FROM ${ workflowExecutionLogs } AS parent_owner_wel
274+ WHERE parent_owner_wel.execution_id = parent_value.owner_execution_id
275+ )
276+ OR EXISTS (
277+ SELECT 1
278+ FROM ${ pausedExecutions } AS parent_owner_pe
279+ WHERE parent_owner_pe.execution_id = parent_value.owner_execution_id
280+ AND parent_owner_pe.status = ANY(${ RESUMABLE_PAUSED_STATUSES } ::text[])
281+ )
282+ OR EXISTS (
283+ SELECT 1
284+ FROM ${ executionLargeValueReferences } AS parent_ref
285+ WHERE parent_ref.key = parent_value.key
286+ AND (
287+ (
288+ parent_ref.source = 'execution_log'
289+ AND EXISTS (
290+ SELECT 1
291+ FROM ${ workflowExecutionLogs } AS parent_ref_wel
292+ WHERE parent_ref_wel.execution_id = parent_ref.execution_id
293+ )
294+ )
295+ OR (
296+ parent_ref.source = 'paused_snapshot'
297+ AND EXISTS (
298+ SELECT 1
299+ FROM ${ pausedExecutions } AS parent_ref_pe
300+ WHERE parent_ref_pe.execution_id = parent_ref.execution_id
301+ AND parent_ref_pe.status = ANY(${ RESUMABLE_PAUSED_STATUSES } ::text[])
302+ )
303+ )
304+ )
305+ )
306+ )
307+ )` ,
308+ sql `NOT EXISTS (
309+ SELECT 1
310+ FROM ${ workflowExecutionLogs } AS owner_wel
311+ WHERE owner_wel.execution_id = split_part(${ workspaceFiles . key } , '/', 4)
312+ )` ,
313+ sql `NOT EXISTS (
314+ SELECT 1
315+ FROM ${ pausedExecutions } AS pe
316+ WHERE pe.execution_id = split_part(${ workspaceFiles . key } , '/', 4)
317+ AND pe.status = ANY(${ RESUMABLE_PAUSED_STATUSES } ::text[])
318+ )`
319+ )
320+ )
321+ . orderBy (
322+ asc ( workspaceFiles . workspaceId ) ,
323+ asc ( workspaceFiles . uploadedAt ) ,
324+ asc ( workspaceFiles . key )
325+ )
326+ . limit ( limit )
327+
328+ if ( rows . length === 0 ) break
329+
330+ const keys = rows . map ( ( row ) => row . key )
331+ stats . largeValuesTotal += keys . length
332+ attempted += keys . length
333+ const result = await deleteLargeValueKeys ( keys )
334+ stats . largeValuesDeleted += result . deleted
335+ stats . largeValuesDeleteFailed += result . failed
336+
337+ if ( result . deleted === 0 ) {
338+ break
339+ }
340+ }
341+
342+ if ( attempted >= LARGE_VALUE_CLEANUP_TOTAL_KEY_LIMIT ) break
343+ }
344+
345+ logger . info (
346+ `[${ label } /legacy_execution_large_values] Complete: ${ stats . largeValuesDeleted } /${ stats . largeValuesTotal } deleted, ${ stats . largeValuesDeleteFailed } failed`
347+ )
348+
349+ return stats
350+ }
351+
191352async function cleanupLargeValueMetadata ( workspaceIds : string [ ] , label : string ) : Promise < void > {
192353 try {
193354 const tombstonesDeletedBefore = new Date (
@@ -286,7 +447,15 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
286447 )
287448 const largeValueResults = await cleanupLargeExecutionValues ( workspaceIds , retentionDate , label )
288449 logger . info (
289- `[${ label } ] workflow_execution_logs large values: ${ largeValueResults . largeValuesDeleted } /${ largeValueResults . largeValuesTotal } deleted, ${ largeValueResults . largeValuesDeleteFailed } failed`
450+ `[${ label } ] execution_large_values: ${ largeValueResults . largeValuesDeleted } /${ largeValueResults . largeValuesTotal } deleted, ${ largeValueResults . largeValuesDeleteFailed } failed`
451+ )
452+ const legacyLargeValueResults = await cleanupLegacyLargeExecutionValues (
453+ workspaceIds ,
454+ retentionDate ,
455+ label
456+ )
457+ logger . info (
458+ `[${ label } ] legacy_execution_large_values: ${ legacyLargeValueResults . largeValuesDeleted } /${ legacyLargeValueResults . largeValuesTotal } deleted, ${ legacyLargeValueResults . largeValuesDeleteFailed } failed`
290459 )
291460 await cleanupLargeValueMetadata ( workspaceIds , label )
292461
0 commit comments