Skip to content

Commit 1e5a746

Browse files
committed
fix snapshot for nested subflows
1 parent a10cafa commit 1e5a746

10 files changed

Lines changed: 406 additions & 55 deletions

File tree

apps/sim/app/workspace/[workspaceId]/w/components/preview/components/preview-workflow/preview-workflow.tsx

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -108,26 +108,24 @@ export function getLeftmostBlockId(workflowState: WorkflowState | null | undefin
108108
/** Execution status for edges/nodes in the preview */
109109
type ExecutionStatus = 'success' | 'error' | 'not-executed'
110110

111-
/** Calculates absolute position, handling nested subflows. */
112-
function calculateAbsolutePosition(
111+
/** Calculates nesting depth, handling nested subflows. */
112+
function calculateNestingDepth(
113113
block: BlockState,
114-
blocks: Record<string, BlockState>
115-
): { x: number; y: number } {
116-
if (!block.data?.parentId) {
117-
return block.position
118-
}
119-
120-
const parentBlock = blocks[block.data.parentId]
114+
blocks: Record<string, BlockState>,
115+
visited: Set<string> = new Set()
116+
): number {
117+
const parentId = block.data?.parentId
118+
if (!parentId) return 0
119+
if (visited.has(parentId)) return 0
120+
121+
const parentBlock = blocks[parentId]
121122
if (!parentBlock) {
122-
logger.warn(`Parent block not found for child block`)
123-
return block.position
123+
logger.warn('Parent block not found for child block')
124+
return 0
124125
}
125126

126-
const parentAbsolutePosition = calculateAbsolutePosition(parentBlock, blocks)
127-
return {
128-
x: parentAbsolutePosition.x + block.position.x,
129-
y: parentAbsolutePosition.y + block.position.y,
130-
}
127+
visited.add(parentId)
128+
return 1 + calculateNestingDepth(parentBlock, blocks, visited)
131129
}
132130

133131
interface PreviewWorkflowProps {
@@ -329,7 +327,8 @@ export function PreviewWorkflow({
329327

330328
if (childStatuses.length === 0) return undefined
331329
if (childStatuses.some((s) => s === 'error')) return 'error'
332-
return 'success'
330+
if (childStatuses.every((s) => s === 'success')) return 'success'
331+
return undefined
333332
}
334333
return derive
335334
}, [subflowChildrenMap, blockExecutionMap, workflowState.blocks])
@@ -367,13 +366,20 @@ export function PreviewWorkflow({
367366

368367
const nodeArray: Node[] = []
369368

370-
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
369+
const sortedBlocks = Object.entries(workflowState.blocks || {}).sort(
370+
([, left], [, right]) =>
371+
calculateNestingDepth(left, workflowState.blocks) -
372+
calculateNestingDepth(right, workflowState.blocks)
373+
)
374+
375+
sortedBlocks.forEach(([blockId, block]) => {
371376
if (!block || !block.type) {
372377
logger.warn(`Skipping invalid block: ${blockId}`)
373378
return
374379
}
375380

376-
const absolutePosition = calculateAbsolutePosition(block, workflowState.blocks)
381+
const parentId = block.data?.parentId
382+
const nestingDepth = calculateNestingDepth(block, workflowState.blocks)
377383

378384
if (block.type === 'loop' || block.type === 'parallel') {
379385
const isSelected = selectedBlockId === blockId
@@ -391,9 +397,14 @@ export function PreviewWorkflow({
391397
nodeArray.push({
392398
id: blockId,
393399
type: 'subflowNode',
394-
position: absolutePosition,
400+
position: block.position,
401+
parentId,
402+
extent: block.data?.extent || undefined,
395403
draggable: false,
404+
zIndex: nestingDepth,
405+
className: parentId ? 'nested-subflow-node' : undefined,
396406
data: {
407+
...block.data,
397408
name: block.name,
398409
width: dimensions.width,
399410
height: dimensions.height,
@@ -430,9 +441,11 @@ export function PreviewWorkflow({
430441
nodeArray.push({
431442
id: blockId,
432443
type: nodeType,
433-
position: absolutePosition,
444+
position: block.position,
445+
parentId,
446+
extent: block.data?.extent || undefined,
434447
draggable: false,
435-
zIndex: block.data?.parentId ? 10 : undefined,
448+
zIndex: parentId ? 1000 : undefined,
436449
data: {
437450
type: block.type,
438451
name: block.name,

apps/sim/app/workspace/[workspaceId]/w/components/preview/preview.tsx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ interface BlockExecutionData {
3434
childWorkflowSnapshotId?: string
3535
}
3636

37+
function isPauseOutput(output: unknown): boolean {
38+
return (
39+
output !== null &&
40+
typeof output === 'object' &&
41+
'_pauseMetadata' in output &&
42+
(output as Record<string, unknown>)._pauseMetadata !== undefined
43+
)
44+
}
45+
3746
/** Represents a level in the workflow navigation stack */
3847
interface WorkflowStackEntry {
3948
workflowState: WorkflowState
@@ -91,7 +100,7 @@ export function buildBlockExecutions(spans: TraceSpan[]): Record<string, BlockEx
91100
blockExecutionMap[span.blockId] = {
92101
input: redactApiKeys(span.input || {}),
93102
output: redactApiKeys(span.output || {}),
94-
status: span.status || 'unknown',
103+
status: isPauseOutput(span.output) ? 'pending' : span.status || 'unknown',
95104
durationMs: span.duration || 0,
96105
children: span.children,
97106
childWorkflowSnapshotId: span.childWorkflowSnapshotId,

apps/sim/executor/execution/block-executor.test.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,151 @@ describe('BlockExecutor', () => {
232232
expect(state.getBlockOutput('function-block-1__obranch-0')).toBeUndefined()
233233
expect(state.getBlockOutput('function-block-1₍0₎')).toBeUndefined()
234234
})
235+
236+
it('does not let block completion callbacks overtake pending start callbacks', async () => {
237+
const block = createBlock()
238+
const workflow: SerializedWorkflow = {
239+
version: '1',
240+
blocks: [block],
241+
connections: [],
242+
loops: {},
243+
parallels: {},
244+
}
245+
const state = new ExecutionState()
246+
const resolver = new VariableResolver(workflow, {}, state)
247+
const output = { result: 'done' }
248+
const execute = vi.fn(async () => {
249+
events.push('execute')
250+
return output
251+
})
252+
const handler: BlockHandler = {
253+
canHandle: () => true,
254+
execute,
255+
}
256+
257+
const events: string[] = []
258+
let resolveStart!: () => void
259+
const startGate = new Promise<void>((resolve) => {
260+
resolveStart = resolve
261+
})
262+
const onBlockStart = vi.fn(async () => {
263+
events.push('start-called')
264+
await startGate
265+
events.push('start-done')
266+
})
267+
const onBlockComplete = vi.fn(async () => {
268+
events.push('complete')
269+
})
270+
271+
const executor = new BlockExecutor(
272+
[handler],
273+
resolver,
274+
{
275+
workspaceId: 'workspace-1',
276+
executionId: 'execution-1',
277+
userId: 'user-1',
278+
metadata: {
279+
requestId: 'request-1',
280+
executionId: 'execution-1',
281+
workflowId: 'workflow-1',
282+
workspaceId: 'workspace-1',
283+
userId: 'user-1',
284+
triggerType: 'manual',
285+
useDraftState: false,
286+
startTime: new Date().toISOString(),
287+
},
288+
onBlockStart,
289+
onBlockComplete,
290+
},
291+
state
292+
)
293+
294+
const execution = executor.execute(createContext(state), createNode(block), block)
295+
296+
expect(onBlockStart).toHaveBeenCalled()
297+
expect(execute).not.toHaveBeenCalled()
298+
expect(onBlockComplete).not.toHaveBeenCalled()
299+
300+
resolveStart()
301+
302+
await execution
303+
await vi.waitFor(() => {
304+
expect(onBlockComplete).toHaveBeenCalled()
305+
})
306+
expect(events).toEqual(['start-called', 'start-done', 'execute', 'complete'])
307+
})
308+
309+
it('fires block completion callbacks for pausing blocks so clients receive pause output', async () => {
310+
const block = {
311+
...createBlock(),
312+
id: 'hitl-block-1',
313+
metadata: { id: BlockType.HUMAN_IN_THE_LOOP, name: 'Human in the Loop' },
314+
config: { tool: BlockType.HUMAN_IN_THE_LOOP, params: {} },
315+
}
316+
const workflow: SerializedWorkflow = {
317+
version: '1',
318+
blocks: [block],
319+
connections: [],
320+
loops: {},
321+
parallels: {},
322+
}
323+
const state = new ExecutionState()
324+
const resolver = new VariableResolver(workflow, {}, state)
325+
const output = {
326+
response: { status: 'paused' },
327+
_pauseMetadata: {
328+
contextId: 'pause-context-1',
329+
blockId: block.id,
330+
response: { status: 'paused' },
331+
timestamp: new Date().toISOString(),
332+
pauseKind: 'human' as const,
333+
},
334+
}
335+
const handler: BlockHandler = {
336+
canHandle: () => true,
337+
execute: async () => output,
338+
}
339+
const onBlockStart = vi.fn(async () => {})
340+
const onBlockComplete = vi.fn(async () => {})
341+
342+
const executor = new BlockExecutor(
343+
[handler],
344+
resolver,
345+
{
346+
workspaceId: 'workspace-1',
347+
executionId: 'execution-1',
348+
userId: 'user-1',
349+
metadata: {
350+
requestId: 'request-1',
351+
executionId: 'execution-1',
352+
workflowId: 'workflow-1',
353+
workspaceId: 'workspace-1',
354+
userId: 'user-1',
355+
triggerType: 'manual',
356+
useDraftState: false,
357+
startTime: new Date().toISOString(),
358+
},
359+
onBlockStart,
360+
onBlockComplete,
361+
},
362+
state
363+
)
364+
365+
await executor.execute(createContext(state), createNode(block), block)
366+
367+
expect(onBlockStart).toHaveBeenCalled()
368+
expect(onBlockComplete).toHaveBeenCalledWith(
369+
block.id,
370+
'Human in the Loop',
371+
BlockType.HUMAN_IN_THE_LOOP,
372+
expect.objectContaining({
373+
output: expect.objectContaining({
374+
response: { status: 'paused' },
375+
}),
376+
}),
377+
undefined,
378+
undefined
379+
)
380+
expect(state.getBlockOutput(block.id)).toEqual(output)
381+
})
235382
})

0 commit comments

Comments
 (0)