Skip to content

Commit 4812504

Browse files
committed
fix(large-refs): cleanup based on table read
1 parent 21c956c commit 4812504

14 files changed

Lines changed: 18716 additions & 116 deletions

File tree

apps/sim/app/api/workflows/[id]/execute/response-block.test.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ import { EXECUTION_RESOURCE_LIMIT_CODE } from '@/lib/execution/resource-errors'
1515
import type { ExecutionResult } from '@/lib/workflows/types'
1616
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
1717

18-
const { mockDownloadFile, mockUploadFile, uploadedFiles } = vi.hoisted(() => ({
19-
mockDownloadFile: vi.fn(),
20-
mockUploadFile: vi.fn(),
21-
uploadedFiles: new Map<string, Buffer>(),
22-
}))
18+
const { mockDownloadFile, mockRegisterLargeValueOwner, mockUploadFile, uploadedFiles } = vi.hoisted(
19+
() => ({
20+
mockDownloadFile: vi.fn(),
21+
mockRegisterLargeValueOwner: vi.fn(),
22+
mockUploadFile: vi.fn(),
23+
uploadedFiles: new Map<string, Buffer>(),
24+
})
25+
)
2326

2427
const MATERIALIZATION_CONTEXT = {
2528
workspaceId: 'workspace-1',
@@ -35,6 +38,10 @@ vi.mock('@/lib/uploads', () => ({
3538
},
3639
}))
3740

41+
vi.mock('@/lib/execution/payloads/large-value-metadata', () => ({
42+
registerLargeValueOwner: mockRegisterLargeValueOwner,
43+
}))
44+
3845
function buildExecutionResult(overrides: Partial<ExecutionResult> = {}): ExecutionResult {
3946
return {
4047
success: true,
@@ -66,6 +73,7 @@ describe('Response block gating by auth type', () => {
6673
vi.clearAllMocks()
6774
clearLargeValueCacheForTests()
6875
uploadedFiles.clear()
76+
mockRegisterLargeValueOwner.mockResolvedValue(true)
6977
mockUploadFile.mockImplementation(async ({ customKey, file }) => {
7078
uploadedFiles.set(customKey, file)
7179
return { key: customKey }
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
interface CleanupRow {
8+
id: string
9+
files: unknown
10+
}
11+
12+
interface CapturedBatchDeleteOptions {
13+
selectChunk: (chunkIds: string[], limit: number) => Promise<unknown>
14+
onBatch?: (rows: CleanupRow[]) => Promise<void>
15+
batchSize?: number
16+
maxBatches?: number
17+
totalRowLimit?: number
18+
}
19+
20+
const {
21+
mockAnd,
22+
mockBatchDeleteByWorkspaceAndTimestamp,
23+
mockChunkedBatchDelete,
24+
mockDeleteFileMetadata,
25+
mockDeleteFiles,
26+
mockEq,
27+
mockExecute,
28+
mockFrom,
29+
mockInArray,
30+
mockIsNull,
31+
mockLeftJoin,
32+
mockLimit,
33+
mockLt,
34+
mockMarkLargeValuesDeleted,
35+
mockNotInArray,
36+
mockOr,
37+
mockOrderBy,
38+
mockPruneLargeValueMetadata,
39+
mockSelect,
40+
mockTask,
41+
mockWhere,
42+
} = vi.hoisted(() => {
43+
const mockLimit = vi.fn(async () => [])
44+
const mockOrderBy = vi.fn(() => ({ limit: mockLimit }))
45+
const mockWhere = vi.fn(() => ({ limit: mockLimit, orderBy: mockOrderBy }))
46+
const mockLeftJoin = vi.fn(() => ({ where: mockWhere }))
47+
const mockFrom = vi.fn(() => ({ leftJoin: mockLeftJoin, where: mockWhere }))
48+
const mockSelect = vi.fn(() => ({ from: mockFrom }))
49+
50+
return {
51+
mockAnd: vi.fn((...args: unknown[]) => ({ op: 'and', args })),
52+
mockBatchDeleteByWorkspaceAndTimestamp: vi.fn(async () => ({
53+
table: 'job',
54+
deleted: 0,
55+
failed: 0,
56+
})),
57+
mockChunkedBatchDelete: vi.fn(),
58+
mockDeleteFileMetadata: vi.fn(async () => true),
59+
mockDeleteFiles: vi.fn(async () => ({ deleted: 2, failed: [] })),
60+
mockEq: vi.fn((...args: unknown[]) => ({ op: 'eq', args })),
61+
mockExecute: vi.fn(),
62+
mockFrom,
63+
mockInArray: vi.fn((...args: unknown[]) => ({ op: 'inArray', args })),
64+
mockIsNull: vi.fn((...args: unknown[]) => ({ op: 'isNull', args })),
65+
mockLeftJoin,
66+
mockLimit,
67+
mockLt: vi.fn((...args: unknown[]) => ({ op: 'lt', args })),
68+
mockMarkLargeValuesDeleted: vi.fn(async () => undefined),
69+
mockNotInArray: vi.fn((...args: unknown[]) => ({ op: 'notInArray', args })),
70+
mockOr: vi.fn((...args: unknown[]) => ({ op: 'or', args })),
71+
mockOrderBy,
72+
mockPruneLargeValueMetadata: vi.fn(async () => ({
73+
referencesDeleted: 0,
74+
dependenciesDeleted: 0,
75+
tombstonesDeleted: 0,
76+
})),
77+
mockSelect,
78+
mockTask: vi.fn((config: unknown) => config),
79+
mockWhere,
80+
}
81+
})
82+
83+
vi.mock('@sim/db', () => ({
84+
db: {
85+
execute: mockExecute,
86+
select: mockSelect,
87+
},
88+
}))
89+
90+
vi.mock('@sim/db/schema', () => ({
91+
executionLargeValues: {
92+
createdAt: 'executionLargeValues.createdAt',
93+
deletedAt: 'executionLargeValues.deletedAt',
94+
key: 'executionLargeValues.key',
95+
workspaceId: 'executionLargeValues.workspaceId',
96+
},
97+
jobExecutionLogs: {
98+
startedAt: 'jobExecutionLogs.startedAt',
99+
workspaceId: 'jobExecutionLogs.workspaceId',
100+
},
101+
pausedExecutions: {
102+
executionId: 'pausedExecutions.executionId',
103+
status: 'pausedExecutions.status',
104+
},
105+
workflowExecutionLogs: {
106+
executionData: 'workflowExecutionLogs.executionData',
107+
executionId: 'workflowExecutionLogs.executionId',
108+
files: 'workflowExecutionLogs.files',
109+
id: 'workflowExecutionLogs.id',
110+
startedAt: 'workflowExecutionLogs.startedAt',
111+
workspaceId: 'workflowExecutionLogs.workspaceId',
112+
},
113+
}))
114+
115+
vi.mock('@sim/logger', () => ({
116+
createLogger: vi.fn(() => ({
117+
error: vi.fn(),
118+
info: vi.fn(),
119+
warn: vi.fn(),
120+
})),
121+
}))
122+
123+
vi.mock('@trigger.dev/sdk', () => ({ task: mockTask }))
124+
125+
vi.mock('drizzle-orm', () => ({
126+
and: mockAnd,
127+
asc: vi.fn((column: unknown) => ({ op: 'asc', column })),
128+
eq: mockEq,
129+
inArray: mockInArray,
130+
isNull: mockIsNull,
131+
lt: mockLt,
132+
notInArray: mockNotInArray,
133+
or: mockOr,
134+
}))
135+
136+
vi.mock('@/lib/cleanup/batch-delete', () => ({
137+
batchDeleteByWorkspaceAndTimestamp: mockBatchDeleteByWorkspaceAndTimestamp,
138+
chunkArray: (items: string[], size: number) => {
139+
const chunks: string[][] = []
140+
for (let index = 0; index < items.length; index += size) {
141+
chunks.push(items.slice(index, index + size))
142+
}
143+
return chunks
144+
},
145+
chunkedBatchDelete: mockChunkedBatchDelete,
146+
}))
147+
148+
vi.mock('@/lib/execution/payloads/large-value-metadata', () => ({
149+
markLargeValuesDeleted: mockMarkLargeValuesDeleted,
150+
pruneLargeValueMetadata: mockPruneLargeValueMetadata,
151+
unreferencedLargeValuePredicate: vi.fn(() => ({ op: 'unreferencedLargeValuePredicate' })),
152+
}))
153+
154+
vi.mock('@/lib/logs/execution/snapshot/service', () => ({
155+
snapshotService: {
156+
cleanupOrphanedSnapshots: vi.fn(async () => 0),
157+
},
158+
}))
159+
160+
vi.mock('@/lib/uploads', () => ({
161+
isUsingCloudStorage: vi.fn(() => true),
162+
StorageService: {
163+
deleteFiles: mockDeleteFiles,
164+
},
165+
}))
166+
167+
vi.mock('@/lib/uploads/server/metadata', () => ({
168+
deleteFileMetadata: mockDeleteFileMetadata,
169+
}))
170+
171+
import { cleanupLogsTask, runCleanupLogs } from '@/background/cleanup-logs'
172+
173+
describe('cleanup logs worker', () => {
174+
beforeEach(() => {
175+
vi.clearAllMocks()
176+
mockChunkedBatchDelete.mockImplementation(async (options: CapturedBatchDeleteOptions) => {
177+
await options.selectChunk(['workspace-1'], 500)
178+
await options.onBatch?.([
179+
{
180+
id: 'log-1',
181+
files: [
182+
{ key: 'execution-file-a' },
183+
{ key: 'execution-file-a' },
184+
{ key: 'execution-file-b' },
185+
],
186+
},
187+
])
188+
return { table: 'workflow_execution_logs', deleted: 1, failed: 0 }
189+
})
190+
})
191+
192+
it('cleans logs without selecting execution_data or scanning refs', async () => {
193+
await runCleanupLogs({
194+
label: 'free/1',
195+
plan: 'free',
196+
retentionHours: 720,
197+
workspaceIds: ['workspace-1'],
198+
})
199+
200+
expect(mockChunkedBatchDelete).toHaveBeenCalledWith(
201+
expect.objectContaining({
202+
batchSize: 500,
203+
maxBatches: 50,
204+
totalRowLimit: 25_000,
205+
})
206+
)
207+
expect(mockSelect).toHaveBeenCalledWith({
208+
id: 'workflowExecutionLogs.id',
209+
files: 'workflowExecutionLogs.files',
210+
})
211+
expect(mockExecute).not.toHaveBeenCalled()
212+
expect(mockDeleteFiles).toHaveBeenCalledWith(
213+
['execution-file-a', 'execution-file-b'],
214+
'execution'
215+
)
216+
expect(mockDeleteFileMetadata).toHaveBeenCalledTimes(2)
217+
expect(mockPruneLargeValueMetadata).toHaveBeenCalledWith(
218+
expect.objectContaining({ workspaceIds: ['workspace-1'] })
219+
)
220+
expect(mockBatchDeleteByWorkspaceAndTimestamp).toHaveBeenCalledOnce()
221+
})
222+
223+
it('does not count large values as deleted when deleted_at marking fails', async () => {
224+
const largeValueKey =
225+
'execution/workspace-1/workflow-1/execution-1/large-value-lv_abcdefghijkl.json'
226+
mockLimit.mockResolvedValueOnce([]).mockResolvedValueOnce([{ key: largeValueKey }])
227+
mockDeleteFiles
228+
.mockResolvedValueOnce({ deleted: 2, failed: [] })
229+
.mockResolvedValueOnce({ deleted: 1, failed: [] })
230+
mockMarkLargeValuesDeleted.mockRejectedValueOnce(new Error('db unavailable'))
231+
232+
await runCleanupLogs({
233+
label: 'free/1',
234+
plan: 'free',
235+
retentionHours: 720,
236+
workspaceIds: ['workspace-1'],
237+
})
238+
239+
expect(mockMarkLargeValuesDeleted).toHaveBeenCalledWith([largeValueKey])
240+
expect(mockDeleteFileMetadata).toHaveBeenCalledTimes(2)
241+
})
242+
243+
it('caps Trigger.dev concurrency for log cleanup tasks', () => {
244+
expect(cleanupLogsTask).toMatchObject({
245+
queue: { concurrencyLimit: 2 },
246+
})
247+
})
248+
})

0 commit comments

Comments
 (0)