Skip to content

Commit b7d2777

Browse files
committed
improvement(cleanup): cleanup refs along in logs cleanup job
1 parent 279010a commit b7d2777

4 files changed

Lines changed: 304 additions & 78 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { describe, expect, it, vi } from 'vitest'
5+
import { collectExecutionLargeValueKeys } from '@/background/cleanup-logs'
6+
7+
vi.mock('@trigger.dev/sdk', () => ({
8+
task: vi.fn((definition) => definition),
9+
}))
10+
11+
describe('collectExecutionLargeValueKeys', () => {
12+
it('collects large-value keys owned by the purged execution', () => {
13+
const ownedRef = {
14+
__simLargeValueRef: true,
15+
version: 1,
16+
id: 'lv_OWNEDREF0001',
17+
kind: 'object',
18+
size: 128,
19+
key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_OWNEDREF0001.json',
20+
executionId: 'execution-1',
21+
}
22+
const ownedManifest = {
23+
__simLargeArrayManifest: true,
24+
version: 2,
25+
kind: 'array',
26+
totalCount: 1,
27+
chunkCount: 1,
28+
byteSize: 128,
29+
chunks: [
30+
{
31+
ref: {
32+
__simLargeValueRef: true,
33+
version: 1,
34+
id: 'lv_CHUNKREF0001',
35+
kind: 'array',
36+
size: 128,
37+
key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_CHUNKREF0001.json',
38+
executionId: 'execution-1',
39+
},
40+
count: 1,
41+
byteSize: 128,
42+
},
43+
],
44+
preview: [],
45+
}
46+
const inheritedRef = {
47+
__simLargeValueRef: true,
48+
version: 1,
49+
id: 'lv_INHERITED001',
50+
kind: 'object',
51+
size: 128,
52+
key: 'execution/workspace-1/workflow-1/source-execution/large-value-lv_INHERITED001.json',
53+
executionId: 'source-execution',
54+
}
55+
56+
const keys = collectExecutionLargeValueKeys(
57+
{
58+
output: {
59+
ownedRef,
60+
ownedManifest,
61+
inheritedRef,
62+
},
63+
},
64+
'execution-1'
65+
)
66+
67+
expect(keys.sort()).toEqual([
68+
'execution/workspace-1/workflow-1/execution-1/large-value-lv_CHUNKREF0001.json',
69+
'execution/workspace-1/workflow-1/execution-1/large-value-lv_OWNEDREF0001.json',
70+
])
71+
})
72+
73+
it('deduplicates repeated refs and ignores keyless cache-only refs', () => {
74+
const ref = {
75+
__simLargeValueRef: true,
76+
version: 1,
77+
id: 'lv_REPEATREF001',
78+
kind: 'string',
79+
size: 128,
80+
key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_REPEATREF001.json',
81+
executionId: 'execution-1',
82+
}
83+
84+
const keys = collectExecutionLargeValueKeys(
85+
{
86+
first: ref,
87+
second: ref,
88+
keyless: {
89+
__simLargeValueRef: true,
90+
version: 1,
91+
id: 'lv_KEYLESSREF01',
92+
kind: 'string',
93+
size: 128,
94+
executionId: 'execution-1',
95+
},
96+
},
97+
'execution-1'
98+
)
99+
100+
expect(keys).toEqual([
101+
'execution/workspace-1/workflow-1/execution-1/large-value-lv_REPEATREF001.json',
102+
])
103+
})
104+
})

apps/sim/background/cleanup-logs.ts

Lines changed: 131 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import { db } from '@sim/db'
2-
import { jobExecutionLogs, workflowExecutionLogs } from '@sim/db/schema'
2+
import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { task } from '@trigger.dev/sdk'
5-
import { and, inArray, lt } from 'drizzle-orm'
5+
import { and, eq, inArray, isNull, lt, notInArray, or } from 'drizzle-orm'
66
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
77
import {
88
batchDeleteByWorkspaceAndTimestamp,
99
chunkedBatchDelete,
1010
type TableCleanupResult,
1111
} from '@/lib/cleanup/batch-delete'
12+
import { isLargeValueRef } from '@/lib/execution/payloads/large-value-ref'
1213
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
1314
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
1415
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
@@ -19,6 +20,60 @@ interface FileDeleteStats {
1920
filesTotal: number
2021
filesDeleted: number
2122
filesDeleteFailed: number
23+
largeValuesTotal: number
24+
largeValuesDeleted: number
25+
largeValuesDeleteFailed: number
26+
}
27+
28+
const RESUMABLE_PAUSED_STATUSES = ['paused', 'partially_resumed', 'cancelling']
29+
30+
export function collectExecutionLargeValueKeys(value: unknown, executionId: string): string[] {
31+
const keys = new Set<string>()
32+
collectExecutionLargeValueKeysInto(value, executionId, new WeakSet<object>(), keys)
33+
return Array.from(keys)
34+
}
35+
36+
function getExecutionIdFromStorageKey(key: string): string | undefined {
37+
const parts = key.split('/')
38+
if (parts[0] !== 'execution' || parts.length < 5) {
39+
return undefined
40+
}
41+
return parts[3]
42+
}
43+
44+
function collectExecutionLargeValueKeysInto(
45+
value: unknown,
46+
executionId: string,
47+
seen: WeakSet<object>,
48+
keys: Set<string>
49+
): void {
50+
if (!value || typeof value !== 'object') {
51+
return
52+
}
53+
54+
if (seen.has(value)) {
55+
return
56+
}
57+
58+
if (isLargeValueRef(value)) {
59+
if (value.key && getExecutionIdFromStorageKey(value.key) === executionId) {
60+
keys.add(value.key)
61+
}
62+
return
63+
}
64+
65+
seen.add(value)
66+
67+
if (Array.isArray(value)) {
68+
for (const item of value) {
69+
collectExecutionLargeValueKeysInto(item, executionId, seen, keys)
70+
}
71+
return
72+
}
73+
74+
for (const entryValue of Object.values(value)) {
75+
collectExecutionLargeValueKeysInto(entryValue, executionId, seen, keys)
76+
}
2277
}
2378

2479
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
@@ -41,36 +96,99 @@ async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Pro
4196
)
4297
}
4398

99+
async function deleteLargeValueStorageKeys(keys: string[], stats: FileDeleteStats): Promise<void> {
100+
if (!isUsingCloudStorage() || keys.length === 0) return
101+
102+
const uniqueKeys = Array.from(new Set(keys))
103+
stats.largeValuesTotal += uniqueKeys.length
104+
105+
await Promise.all(
106+
uniqueKeys.map(async (key) => {
107+
try {
108+
await StorageService.deleteFile({ key, context: 'execution' })
109+
await deleteFileMetadata(key)
110+
stats.largeValuesDeleted++
111+
} catch (error) {
112+
stats.largeValuesDeleteFailed++
113+
logger.error(`Failed to delete large execution value ${key}:`, { error })
114+
}
115+
})
116+
)
117+
}
118+
44119
async function cleanupWorkflowExecutionLogs(
45120
workspaceIds: string[],
46121
retentionDate: Date,
47122
label: string
48123
): Promise<TableCleanupResult & FileDeleteStats> {
49-
const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 }
124+
const fileStats: FileDeleteStats = {
125+
filesTotal: 0,
126+
filesDeleted: 0,
127+
filesDeleteFailed: 0,
128+
largeValuesTotal: 0,
129+
largeValuesDeleted: 0,
130+
largeValuesDeleteFailed: 0,
131+
}
50132

51133
const dbStats = await chunkedBatchDelete({
52134
tableDef: workflowExecutionLogs,
53135
workspaceIds,
54136
tableName: `${label}/workflow_execution_logs`,
55137
selectChunk: (chunkIds, limit) =>
56138
db
57-
.select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files })
139+
.select({
140+
id: workflowExecutionLogs.id,
141+
executionId: workflowExecutionLogs.executionId,
142+
executionData: workflowExecutionLogs.executionData,
143+
files: workflowExecutionLogs.files,
144+
})
58145
.from(workflowExecutionLogs)
146+
.leftJoin(
147+
pausedExecutions,
148+
eq(pausedExecutions.executionId, workflowExecutionLogs.executionId)
149+
)
59150
.where(
60151
and(
61152
inArray(workflowExecutionLogs.workspaceId, chunkIds),
62-
lt(workflowExecutionLogs.startedAt, retentionDate)
153+
lt(workflowExecutionLogs.startedAt, retentionDate),
154+
or(
155+
isNull(pausedExecutions.status),
156+
notInArray(pausedExecutions.status, RESUMABLE_PAUSED_STATUSES)
157+
)
63158
)
64159
)
65160
.limit(limit),
66161
onBatch: async (rows) => {
67-
for (const row of rows) await deleteExecutionFiles(row.files, fileStats)
162+
for (const row of rows) {
163+
await deleteExecutionFiles(row.files, fileStats)
164+
await deleteLargeValueStorageKeys(
165+
collectExecutionLargeValueKeys(row.executionData, row.executionId),
166+
fileStats
167+
)
168+
}
68169
},
69170
})
70171

71172
return { ...dbStats, ...fileStats }
72173
}
73174

175+
async function cleanupFreePlanOrphanedSnapshots(
176+
payload: CleanupJobPayload,
177+
retentionHours: number
178+
): Promise<void> {
179+
if (payload.plan !== 'free') {
180+
return
181+
}
182+
183+
try {
184+
const retentionDays = Math.floor(retentionHours / 24)
185+
const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1)
186+
logger.info(`Cleaned up ${snapshotsCleaned} orphaned snapshots`)
187+
} catch (snapshotError) {
188+
logger.error('Error cleaning up orphaned snapshots:', { snapshotError })
189+
}
190+
}
191+
74192
export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
75193
const startTime = Date.now()
76194

@@ -82,12 +200,14 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
82200

83201
const { workspaceIds, retentionHours, label } = scope
84202

203+
const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000)
204+
85205
if (workspaceIds.length === 0) {
86206
logger.info(`[${label}] No workspaces to process`)
207+
await cleanupFreePlanOrphanedSnapshots(payload, retentionHours)
87208
return
88209
}
89210

90-
const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000)
91211
logger.info(
92212
`[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
93213
)
@@ -96,6 +216,9 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
96216
logger.info(
97217
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
98218
)
219+
logger.info(
220+
`[${label}] workflow_execution_logs large values: ${workflowResults.largeValuesDeleted}/${workflowResults.largeValuesTotal} deleted, ${workflowResults.largeValuesDeleteFailed} failed`
221+
)
99222

100223
await batchDeleteByWorkspaceAndTimestamp({
101224
tableDef: jobExecutionLogs,
@@ -106,16 +229,7 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
106229
tableName: `${label}/job_execution_logs`,
107230
})
108231

109-
// Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
110-
if (payload.plan === 'free') {
111-
try {
112-
const retentionDays = Math.floor(retentionHours / 24)
113-
const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1)
114-
logger.info(`Cleaned up ${snapshotsCleaned} orphaned snapshots`)
115-
} catch (snapshotError) {
116-
logger.error('Error cleaning up orphaned snapshots:', { snapshotError })
117-
}
118-
}
232+
await cleanupFreePlanOrphanedSnapshots(payload, retentionHours)
119233

120234
const timeElapsed = (Date.now() - startTime) / 1000
121235
logger.info(`[${label}] Job completed in ${timeElapsed.toFixed(2)}s`)

0 commit comments

Comments
 (0)