Skip to content

Commit 06e7b4f

Browse files
committed
address comments
1 parent ccfda75 commit 06e7b4f

6 files changed

Lines changed: 133 additions & 26 deletions

File tree

apps/sim/lib/execution/payloads/large-value-metadata.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
mockValues,
2828
mockWhere,
2929
mockTxWhere,
30+
mockNotInArray,
3031
} = vi.hoisted(() => {
3132
const mockOnConflictDoNothing = vi.fn(async () => undefined)
3233
const mockValues = vi.fn(() => ({ onConflictDoNothing: mockOnConflictDoNothing }))
@@ -54,6 +55,7 @@ const {
5455
mockEq: vi.fn((...args: unknown[]) => ({ op: 'eq', args })),
5556
mockExecute: vi.fn(async () => [{ count: 0 }]),
5657
mockInsert,
58+
mockNotInArray: vi.fn((...args: unknown[]) => ({ op: 'notInArray', args })),
5759
mockOnConflictDoNothing,
5860
mockSelect,
5961
mockSelectFrom,
@@ -127,6 +129,7 @@ vi.mock('drizzle-orm', () => ({
127129
and: mockAnd,
128130
eq: mockEq,
129131
inArray: vi.fn((...args: unknown[]) => ({ op: 'inArray', args })),
132+
notInArray: mockNotInArray,
130133
sql: vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })),
131134
}))
132135

@@ -286,6 +289,38 @@ describe('large value metadata', () => {
286289
expect(mockTxSelectLimit).toHaveBeenCalledWith(MAX_LARGE_VALUE_REFERENCES_PER_SCOPE)
287290
})
288291

292+
it('filters known dependency children before applying the remaining reference budget', async () => {
293+
const directKeys = Array.from({ length: MAX_LARGE_VALUE_REFERENCES_PER_SCOPE }, (_, index) =>
294+
largeValueKey(`e${index.toString(36).padStart(11, '0')}`)
295+
)
296+
const knownChildKey = directKeys[1]
297+
const unseenChildKey = largeValueKey('unseenchild1', 'source-execution')
298+
mockTxSelectLimit.mockImplementationOnce(async () => {
299+
const filtersKnownChildren = mockNotInArray.mock.calls.some(
300+
([field, values]) =>
301+
field === 'executionLargeValueDependencies.childKey' &&
302+
Array.isArray(values) &&
303+
values.includes(knownChildKey)
304+
)
305+
return [{ childKey: filtersKnownChildren ? unseenChildKey : knownChildKey }]
306+
})
307+
308+
await expect(
309+
registerLargeValueOwner(
310+
{
311+
key: largeValueKey('zyxwvutsrqpo', 'execution-1'),
312+
workspaceId: 'workspace-1',
313+
workflowId: 'workflow-1',
314+
executionId: 'execution-1',
315+
size: 123,
316+
},
317+
directKeys
318+
)
319+
).rejects.toThrow('Large value dependency closure exceeds the limit')
320+
321+
expect(mockTxSelectLimit).toHaveBeenCalledWith(1)
322+
})
323+
289324
it('replaces an execution reference set with same-workspace unique keys', async () => {
290325
const matchingKey = largeValueKey('abcdefghijkl')
291326
const otherWorkspaceKey =

apps/sim/lib/execution/payloads/large-value-metadata.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
workflowExecutionLogs,
88
} from '@sim/db/schema'
99
import { createLogger } from '@sim/logger'
10-
import { and, eq, inArray, sql } from 'drizzle-orm'
10+
import { and, eq, inArray, notInArray, sql } from 'drizzle-orm'
1111
import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
1212

1313
const logger = createLogger('LargeValueMetadata')
@@ -101,6 +101,16 @@ function getCount(rows: unknown): number {
101101
return Number((row as { count: unknown }).count) || 0
102102
}
103103

104+
export function collectLargeValueReferenceKeys(value: unknown, workspaceId?: string): string[] {
105+
return getBoundedUniqueKeys(
106+
collectLargeValueKeys(value).filter((key) => {
107+
const parsed = parseLargeValueStorageKey(key)
108+
return workspaceId ? parsed?.workspaceId === workspaceId : Boolean(parsed)
109+
}),
110+
'Large value reference set'
111+
)
112+
}
113+
104114
async function getDependencyClosure(
105115
client: LargeValueMetadataClient,
106116
ownerKey: string,
@@ -133,7 +143,8 @@ async function getDependencyClosure(
133143
.where(
134144
and(
135145
eq(executionLargeValueDependencies.workspaceId, workspaceId),
136-
inArray(executionLargeValueDependencies.parentKey, keyChunk)
146+
inArray(executionLargeValueDependencies.parentKey, keyChunk),
147+
notInArray(executionLargeValueDependencies.childKey, Array.from(closureKeys))
137148
)
138149
)
139150
.limit(remainingBudget + 1)
@@ -225,14 +236,30 @@ export async function replaceLargeValueReferencesWithClient(
225236
client: LargeValueMetadataClient,
226237
scope: LargeValueReferenceScope,
227238
value: unknown
239+
): Promise<void> {
240+
if (!scope.workspaceId || !scope.executionId) {
241+
return
242+
}
243+
244+
await replaceLargeValueReferenceKeysWithClient(
245+
client,
246+
scope,
247+
collectLargeValueReferenceKeys(value, scope.workspaceId)
248+
)
249+
}
250+
251+
export async function replaceLargeValueReferenceKeysWithClient(
252+
client: LargeValueMetadataClient,
253+
scope: LargeValueReferenceScope,
254+
referenceKeys: string[]
228255
): Promise<void> {
229256
const { workspaceId, workflowId, executionId, source } = scope
230257
if (!workspaceId || !executionId) {
231258
return
232259
}
233260

234261
const keys = getBoundedUniqueKeys(
235-
collectLargeValueKeys(value).filter((key) => {
262+
referenceKeys.filter((key) => {
236263
const parsed = parseLargeValueStorageKey(key)
237264
return parsed?.workspaceId === workspaceId
238265
}),
@@ -340,8 +367,11 @@ export async function replaceLargeValueReferences(
340367
scope: LargeValueReferenceScope,
341368
value: unknown
342369
): Promise<void> {
370+
const referenceKeys = scope.workspaceId
371+
? collectLargeValueReferenceKeys(value, scope.workspaceId)
372+
: []
343373
await db.transaction(async (tx) => {
344-
await replaceLargeValueReferencesWithClient(tx, scope, value)
374+
await replaceLargeValueReferenceKeysWithClient(tx, scope, referenceKeys)
345375
})
346376
}
347377

apps/sim/lib/logs/execution/logger.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
2222
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
2323
import { redactApiKeys } from '@/lib/core/security/redaction'
2424
import { filterForDisplay } from '@/lib/core/utils/display-filters'
25-
import { replaceLargeValueReferencesWithClient } from '@/lib/execution/payloads/large-value-metadata'
25+
import {
26+
collectLargeValueReferenceKeys,
27+
replaceLargeValueReferenceKeysWithClient,
28+
} from '@/lib/execution/payloads/large-value-metadata'
2629
import { emitWorkflowExecutionCompleted } from '@/lib/logs/events'
2730
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
2831
import type {
@@ -740,6 +743,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
740743
},
741744
executionId
742745
)
746+
const completedExecutionLargeValueKeys = collectLargeValueReferenceKeys(completedExecutionData)
743747

744748
const updatedLog = await db.transaction(async (tx) => {
745749
await setExecutionLogWriteTimeouts(tx)
@@ -762,15 +766,15 @@ export class ExecutionLogger implements IExecutionLoggerService {
762766
throw new Error(`Workflow log not found for execution ${executionId}`)
763767
}
764768

765-
await replaceLargeValueReferencesWithClient(
769+
await replaceLargeValueReferenceKeysWithClient(
766770
tx,
767771
{
768772
workspaceId: log.workspaceId,
769773
workflowId: log.workflowId,
770774
executionId,
771775
source: 'execution_log',
772776
},
773-
completedExecutionData
777+
completedExecutionLargeValueKeys
774778
)
775779

776780
return log

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import {
1313
resetExecutionStreamBuffer,
1414
type TerminalExecutionStreamStatus,
1515
} from '@/lib/execution/event-buffer'
16-
import { replaceLargeValueReferencesWithClient } from '@/lib/execution/payloads/large-value-metadata'
16+
import {
17+
collectLargeValueReferenceKeys,
18+
replaceLargeValueReferenceKeysWithClient,
19+
} from '@/lib/execution/payloads/large-value-metadata'
1720
import { compactBlockLogs, compactExecutionPayload } from '@/lib/execution/payloads/serializer'
1821
import { preprocessExecution } from '@/lib/execution/preprocessing'
1922
import { LoggingSession } from '@/lib/logs/execution/logging-session'
@@ -189,6 +192,9 @@ export class PauseResumeManager {
189192
const snapshotWorkspaceId = getSnapshotWorkspaceId(
190193
isRecord(snapshotReferenceValue) ? snapshotReferenceValue.snapshot : undefined
191194
)
195+
const snapshotReferenceKeys = snapshotWorkspaceId
196+
? collectLargeValueReferenceKeys(snapshotReferenceValue, snapshotWorkspaceId)
197+
: []
192198

193199
const pausePointsRecord = pausePoints.reduce<Record<string, any>>((acc, point) => {
194200
acc[point.contextId] = {
@@ -241,15 +247,15 @@ export class PauseResumeManager {
241247
nextResumeAt,
242248
})
243249
if (snapshotWorkspaceId) {
244-
await replaceLargeValueReferencesWithClient(
250+
await replaceLargeValueReferenceKeysWithClient(
245251
tx,
246252
{
247253
workspaceId: snapshotWorkspaceId,
248254
workflowId,
249255
executionId,
250256
source: 'paused_snapshot',
251257
},
252-
snapshotReferenceValue
258+
snapshotReferenceKeys
253259
)
254260
}
255261
return
@@ -298,15 +304,15 @@ export class PauseResumeManager {
298304
.where(eq(pausedExecutions.id, existing.id))
299305

300306
if (snapshotWorkspaceId) {
301-
await replaceLargeValueReferencesWithClient(
307+
await replaceLargeValueReferenceKeysWithClient(
302308
tx,
303309
{
304310
workspaceId: snapshotWorkspaceId,
305311
workflowId,
306312
executionId,
307313
source: 'paused_snapshot',
308314
},
309-
snapshotReferenceValue
315+
snapshotReferenceKeys
310316
)
311317
}
312318
})
@@ -1614,6 +1620,10 @@ export class PauseResumeManager {
16141620
triggerIds: currentSnapshot.triggerIds,
16151621
}
16161622
const snapshotWorkspaceId = getSnapshotWorkspaceId(snapshotData)
1623+
const snapshotReferenceValue = { ...updatedSnapshot, snapshot: snapshotData }
1624+
const snapshotReferenceKeys = snapshotWorkspaceId
1625+
? collectLargeValueReferenceKeys(snapshotReferenceValue, snapshotWorkspaceId)
1626+
: []
16171627

16181628
await db.transaction(async (tx) => {
16191629
await tx
@@ -1625,15 +1635,15 @@ export class PauseResumeManager {
16251635
.where(eq(pausedExecutions.id, pausedExecutionId))
16261636

16271637
if (snapshotWorkspaceId) {
1628-
await replaceLargeValueReferencesWithClient(
1638+
await replaceLargeValueReferenceKeysWithClient(
16291639
tx,
16301640
{
16311641
workspaceId: snapshotWorkspaceId,
16321642
workflowId: pausedExecution.workflowId,
16331643
executionId: pausedExecution.executionId,
16341644
source: 'paused_snapshot',
16351645
},
1636-
{ ...updatedSnapshot, snapshot: snapshotData }
1646+
snapshotReferenceKeys
16371647
)
16381648
}
16391649
})

packages/testing/src/mocks/database.mock.ts

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -234,20 +234,25 @@ export const dbChainMock = {
234234
* Creates a mock database connection.
235235
*/
236236
export function createMockDb() {
237+
const fromBuilder = () => ({
238+
where: vi.fn(() => ({
239+
limit: vi.fn(() => Promise.resolve([])),
240+
orderBy: vi.fn(() => Promise.resolve([])),
241+
})),
242+
leftJoin: vi.fn(() => ({
243+
where: vi.fn(() => Promise.resolve([])),
244+
})),
245+
innerJoin: vi.fn(() => ({
246+
where: vi.fn(() => Promise.resolve([])),
247+
})),
248+
})
249+
237250
return {
238251
select: vi.fn(() => ({
239-
from: vi.fn(() => ({
240-
where: vi.fn(() => ({
241-
limit: vi.fn(() => Promise.resolve([])),
242-
orderBy: vi.fn(() => Promise.resolve([])),
243-
})),
244-
leftJoin: vi.fn(() => ({
245-
where: vi.fn(() => Promise.resolve([])),
246-
})),
247-
innerJoin: vi.fn(() => ({
248-
where: vi.fn(() => Promise.resolve([])),
249-
})),
250-
})),
252+
from: vi.fn(fromBuilder),
253+
})),
254+
selectDistinct: vi.fn(() => ({
255+
from: vi.fn(fromBuilder),
251256
})),
252257
insert: vi.fn(() => ({
253258
values: vi.fn(() => ({

packages/testing/src/mocks/schema.mock.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,29 @@ export const schemaMock = {
157157
files: 'files',
158158
createdAt: 'createdAt',
159159
},
160+
executionLargeValues: {
161+
key: 'key',
162+
workspaceId: 'workspaceId',
163+
workflowId: 'workflowId',
164+
ownerExecutionId: 'ownerExecutionId',
165+
size: 'size',
166+
createdAt: 'createdAt',
167+
deletedAt: 'deletedAt',
168+
},
169+
executionLargeValueReferences: {
170+
key: 'key',
171+
executionId: 'executionId',
172+
source: 'source',
173+
workspaceId: 'workspaceId',
174+
workflowId: 'workflowId',
175+
createdAt: 'createdAt',
176+
},
177+
executionLargeValueDependencies: {
178+
parentKey: 'parentKey',
179+
childKey: 'childKey',
180+
workspaceId: 'workspaceId',
181+
createdAt: 'createdAt',
182+
},
160183
pausedExecutions: {
161184
id: 'id',
162185
workflowId: 'workflowId',

0 commit comments

Comments
 (0)