Skip to content

Commit bc045e4

Browse files
committed
address comments
1 parent ebdaa84 commit bc045e4

33 files changed

Lines changed: 1316 additions & 309 deletions

apps/sim/executor/dag/construction/edges.test.ts

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { beforeEach, describe, expect, it } from 'vitest'
22
import type { DAG, DAGNode } from '@/executor/dag/builder'
3+
import { buildBranchNodeId } from '@/executor/utils/subflow-utils'
34
import type { SerializedBlock, SerializedLoop, SerializedWorkflow } from '@/serializer/types'
45
import { EdgeConstructor } from './edges'
56

@@ -133,6 +134,94 @@ describe('EdgeConstructor', () => {
133134
})
134135
})
135136

137+
describe('nested subflow skip-at-start bypasses', () => {
138+
it('wires a nested loop start exit to the next sibling inside a parallel branch', () => {
139+
const parallelId = 'parallel-1'
140+
const loopId = 'loop-1'
141+
const afterId = 'after'
142+
const loopStartId = `loop-${loopId}-sentinel-start`
143+
const loopEndId = `loop-${loopId}-sentinel-end`
144+
const afterTemplateId = buildBranchNodeId(afterId, 0)
145+
const dag = createMockDAG([loopStartId, loopEndId, afterTemplateId])
146+
dag.nodes.get(loopStartId)!.metadata = {
147+
isSentinel: true,
148+
sentinelType: 'start',
149+
subflowId: loopId,
150+
subflowType: 'loop',
151+
}
152+
dag.nodes.get(loopEndId)!.metadata = {
153+
isSentinel: true,
154+
sentinelType: 'end',
155+
subflowId: loopId,
156+
subflowType: 'loop',
157+
}
158+
dag.nodes.get(afterTemplateId)!.metadata = {
159+
isParallelBranch: true,
160+
subflowId: parallelId,
161+
subflowType: 'parallel',
162+
branchIndex: 0,
163+
}
164+
dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 1 })
165+
dag.parallelConfigs.set(parallelId, {
166+
id: parallelId,
167+
nodes: [loopId, afterId],
168+
count: 1,
169+
})
170+
171+
const workflow = createMockWorkflow(
172+
[createMockBlock(loopId, 'loop'), createMockBlock(afterId)],
173+
[{ source: loopId, target: afterId }]
174+
)
175+
176+
edgeConstructor.execute(
177+
workflow,
178+
dag,
179+
new Set([loopId, afterId]),
180+
new Set(),
181+
new Set([loopId, afterId]),
182+
new Map()
183+
)
184+
185+
const loopStartTargets = Array.from(dag.nodes.get(loopStartId)!.outgoingEdges.values())
186+
expect(loopStartTargets).toContainEqual({
187+
target: loopEndId,
188+
sourceHandle: 'loop_exit',
189+
targetHandle: undefined,
190+
})
191+
expect(dag.nodes.get(loopEndId)!.incomingEdges).not.toContain(loopStartId)
192+
})
193+
194+
it('wires a parallel start exit bypass to a downstream parallel sentinel start', () => {
195+
const sourceParallelId = 'parallel-a'
196+
const targetParallelId = 'parallel-b'
197+
const sourceStartId = `parallel-${sourceParallelId}-sentinel-start`
198+
const sourceEndId = `parallel-${sourceParallelId}-sentinel-end`
199+
const targetStartId = `parallel-${targetParallelId}-sentinel-start`
200+
const targetEndId = `parallel-${targetParallelId}-sentinel-end`
201+
const dag = createMockDAG([sourceStartId, sourceEndId, targetStartId, targetEndId])
202+
dag.parallelConfigs.set(sourceParallelId, { id: sourceParallelId, nodes: [], count: 1 })
203+
dag.parallelConfigs.set(targetParallelId, { id: targetParallelId, nodes: [], count: 1 })
204+
205+
const workflow = createMockWorkflow(
206+
[
207+
createMockBlock(sourceParallelId, 'parallel'),
208+
createMockBlock(targetParallelId, 'parallel'),
209+
],
210+
[{ source: sourceParallelId, target: targetParallelId }]
211+
)
212+
213+
edgeConstructor.execute(workflow, dag, new Set(), new Set(), new Set(), new Map())
214+
215+
const sourceStartTargets = Array.from(dag.nodes.get(sourceStartId)!.outgoingEdges.values())
216+
expect(sourceStartTargets).toContainEqual({
217+
target: sourceEndId,
218+
sourceHandle: 'parallel_exit',
219+
targetHandle: undefined,
220+
})
221+
expect(dag.nodes.get(sourceEndId)!.incomingEdges).not.toContain(sourceStartId)
222+
})
223+
})
224+
136225
describe('Condition block edge wiring', () => {
137226
it('should wire condition block edges with proper condition prefixes', () => {
138227
const conditionId = 'condition-1'
@@ -930,7 +1019,6 @@ describe('EdgeConstructor', () => {
9301019
)
9311020
expect(edgesToParallelStart.length).toBe(1)
9321021
expect(edgesToParallelStart[0].sourceHandle).toBe('parallel_continue')
933-
expect(edgesToParallelStart[0].isActive).toBe(false)
9341022

9351023
const parallelStartNode = dag.nodes.get(parallelSentinelStart)!
9361024
expect(parallelStartNode.incomingEdges.has(parallelSentinelEnd)).toBe(false)
@@ -1367,34 +1455,29 @@ describe('EdgeConstructor', () => {
13671455
dag.nodes.get(outerSentinelStart)!.metadata = {
13681456
isSentinel: true,
13691457
sentinelType: 'start',
1370-
parallelId: outerParallelId,
13711458
subflowId: outerParallelId,
13721459
subflowType: 'parallel',
13731460
}
13741461
dag.nodes.get(outerSentinelEnd)!.metadata = {
13751462
isSentinel: true,
13761463
sentinelType: 'end',
1377-
parallelId: outerParallelId,
13781464
subflowId: outerParallelId,
13791465
subflowType: 'parallel',
13801466
}
13811467
dag.nodes.get(innerSentinelStart)!.metadata = {
13821468
isSentinel: true,
13831469
sentinelType: 'start',
1384-
parallelId: innerParallelId,
13851470
subflowId: innerParallelId,
13861471
subflowType: 'parallel',
13871472
}
13881473
dag.nodes.get(innerSentinelEnd)!.metadata = {
13891474
isSentinel: true,
13901475
sentinelType: 'end',
1391-
parallelId: innerParallelId,
13921476
subflowId: innerParallelId,
13931477
subflowType: 'parallel',
13941478
}
13951479
dag.nodes.get(funcTemplate)!.metadata = {
13961480
isParallelBranch: true,
1397-
parallelId: innerParallelId,
13981481
subflowId: innerParallelId,
13991482
subflowType: 'parallel',
14001483
branchIndex: 0,
@@ -1494,34 +1577,29 @@ describe('EdgeConstructor', () => {
14941577
dag.nodes.get(loopSentinelStart)!.metadata = {
14951578
isSentinel: true,
14961579
sentinelType: 'start',
1497-
loopId,
14981580
subflowId: loopId,
14991581
subflowType: 'loop',
15001582
}
15011583
dag.nodes.get(loopSentinelEnd)!.metadata = {
15021584
isSentinel: true,
15031585
sentinelType: 'end',
1504-
loopId,
15051586
subflowId: loopId,
15061587
subflowType: 'loop',
15071588
}
15081589
dag.nodes.get(parallelSentinelStart)!.metadata = {
15091590
isSentinel: true,
15101591
sentinelType: 'start',
1511-
parallelId: innerParallelId,
15121592
subflowId: innerParallelId,
15131593
subflowType: 'parallel',
15141594
}
15151595
dag.nodes.get(parallelSentinelEnd)!.metadata = {
15161596
isSentinel: true,
15171597
sentinelType: 'end',
1518-
parallelId: innerParallelId,
15191598
subflowId: innerParallelId,
15201599
subflowType: 'parallel',
15211600
}
15221601
dag.nodes.get(funcTemplate)!.metadata = {
15231602
isParallelBranch: true,
1524-
parallelId: innerParallelId,
15251603
subflowId: innerParallelId,
15261604
subflowType: 'parallel',
15271605
branchIndex: 0,
@@ -1617,28 +1695,24 @@ describe('EdgeConstructor', () => {
16171695
dag.nodes.get(outerSentinelStart)!.metadata = {
16181696
isSentinel: true,
16191697
sentinelType: 'start',
1620-
parallelId: outerParallelId,
16211698
subflowId: outerParallelId,
16221699
subflowType: 'parallel',
16231700
}
16241701
dag.nodes.get(outerSentinelEnd)!.metadata = {
16251702
isSentinel: true,
16261703
sentinelType: 'end',
1627-
parallelId: outerParallelId,
16281704
subflowId: outerParallelId,
16291705
subflowType: 'parallel',
16301706
}
16311707
dag.nodes.get(innerSentinelStart)!.metadata = {
16321708
isSentinel: true,
16331709
sentinelType: 'start',
1634-
loopId: innerLoopId,
16351710
subflowId: innerLoopId,
16361711
subflowType: 'loop',
16371712
}
16381713
dag.nodes.get(innerSentinelEnd)!.metadata = {
16391714
isSentinel: true,
16401715
sentinelType: 'end',
1641-
loopId: innerLoopId,
16421716
subflowId: innerLoopId,
16431717
subflowType: 'loop',
16441718
}

apps/sim/executor/dag/construction/edges.ts

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,19 @@ export class EdgeConstructor {
223223
const sourceIsParallelBlock = parallelBlockIds.has(source)
224224
const targetIsParallelBlock = parallelBlockIds.has(target)
225225

226+
if (this.edgeStaysWithinSameParallel(originalSource, originalTarget, dag)) {
227+
const sourceId = this.resolveSubflowToSentinelEnd(originalSource, dag)
228+
const targetId = this.resolveSubflowToSentinelStart(originalTarget, dag)
229+
const resolvedSourceHandle = this.resolveParallelChildSourceHandle(
230+
originalSource,
231+
dag,
232+
sourceHandle
233+
)
234+
this.addEdge(dag, sourceId, targetId, resolvedSourceHandle, targetHandle)
235+
this.addSubflowStartExitBypass(dag, originalSource)
236+
continue
237+
}
238+
226239
let loopSentinelStartId: string | undefined
227240

228241
if (sourceIsLoopBlock) {
@@ -265,6 +278,10 @@ export class EdgeConstructor {
265278
target = sentinelStartId
266279
}
267280

281+
if (sourceIsParallelBlock) {
282+
this.addSubflowStartExitBypass(dag, originalSource)
283+
}
284+
268285
if (this.edgeCrossesLoopBoundary(originalSource, originalTarget, blocksInLoops, dag)) {
269286
continue
270287
}
@@ -327,6 +344,7 @@ export class EdgeConstructor {
327344
? EDGE.PARALLEL_EXIT
328345
: EDGE.LOOP_EXIT
329346
this.addEdge(dag, resolvedId, sentinelEndId, handle)
347+
this.addSubflowStartExitBypass(dag, terminalNodeId)
330348
} else {
331349
this.addEdge(dag, resolvedId, sentinelEndId)
332350
}
@@ -361,10 +379,11 @@ export class EdgeConstructor {
361379
for (const terminalNodeId of terminalNodes) {
362380
const sourceId = this.resolveSubflowToSentinelEnd(terminalNodeId, dag)
363381
if (dag.nodes.has(sourceId)) {
364-
// Use the sourceHandle that matches the nested subflow's exit route.
365-
// A nested loop sentinel-end outputs "loop_exit", not "parallel_exit".
366-
const handle = dag.loopConfigs.has(terminalNodeId) ? EDGE.LOOP_EXIT : EDGE.PARALLEL_EXIT
382+
const handle = this.resolveSubflowExitHandle(terminalNodeId, dag)
367383
this.addEdge(dag, sourceId, sentinelEndId, handle)
384+
if (handle) {
385+
this.addSubflowStartExitBypass(dag, terminalNodeId)
386+
}
368387
}
369388
}
370389

@@ -678,13 +697,63 @@ export class EdgeConstructor {
678697
return null
679698
}
680699

700+
private edgeStaysWithinSameParallel(source: string, target: string, dag: DAG): boolean {
701+
const sourceParallelId = this.getParallelId(source, dag)
702+
const targetParallelId = this.getParallelId(target, dag)
703+
return !!sourceParallelId && sourceParallelId === targetParallelId
704+
}
705+
706+
private resolveParallelChildSourceHandle(
707+
source: string,
708+
dag: DAG,
709+
sourceHandle?: string
710+
): string | undefined {
711+
if (dag.parallelConfigs.has(source)) {
712+
return EDGE.PARALLEL_EXIT
713+
}
714+
if (dag.loopConfigs.has(source)) {
715+
return EDGE.LOOP_EXIT
716+
}
717+
return sourceHandle
718+
}
719+
720+
private resolveSubflowExitHandle(nodeId: string, dag: DAG): string | undefined {
721+
if (dag.parallelConfigs.has(nodeId)) {
722+
return EDGE.PARALLEL_EXIT
723+
}
724+
if (dag.loopConfigs.has(nodeId)) {
725+
return EDGE.LOOP_EXIT
726+
}
727+
return undefined
728+
}
729+
730+
private addSubflowStartExitBypass(dag: DAG, subflowId: string): void {
731+
if (dag.parallelConfigs.has(subflowId)) {
732+
const sourceId = buildParallelSentinelStartId(subflowId)
733+
const targetId = buildParallelSentinelEndId(subflowId)
734+
if (dag.nodes.has(sourceId) && dag.nodes.has(targetId)) {
735+
this.addEdge(dag, sourceId, targetId, EDGE.PARALLEL_EXIT, undefined, true, false)
736+
}
737+
return
738+
}
739+
740+
if (dag.loopConfigs.has(subflowId)) {
741+
const sourceId = buildSentinelStartId(subflowId)
742+
const targetId = buildSentinelEndId(subflowId)
743+
if (dag.nodes.has(sourceId) && dag.nodes.has(targetId)) {
744+
this.addEdge(dag, sourceId, targetId, EDGE.LOOP_EXIT, undefined, true, false)
745+
}
746+
}
747+
}
748+
681749
private addEdge(
682750
dag: DAG,
683751
sourceId: string,
684752
targetId: string,
685753
sourceHandle?: string,
686754
targetHandle?: string,
687-
isControlBackEdge = false
755+
isControlBackEdge = false,
756+
registerIncoming = true
688757
): void {
689758
const sourceNode = dag.nodes.get(sourceId)
690759
const targetNode = dag.nodes.get(targetId)
@@ -700,10 +769,9 @@ export class EdgeConstructor {
700769
target: targetId,
701770
sourceHandle,
702771
targetHandle,
703-
isActive: isControlBackEdge ? false : undefined,
704772
})
705773

706-
if (!isControlBackEdge) {
774+
if (!isControlBackEdge && registerIncoming) {
707775
targetNode.incomingEdges.add(sourceId)
708776
}
709777
}

apps/sim/executor/dag/construction/loops.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
import { createLogger } from '@sim/logger'
21
import { BlockType, LOOP } from '@/executor/constants'
32
import type { DAG } from '@/executor/dag/builder'
43
import { createSubflowSentinelNode } from '@/executor/dag/construction/sentinels'
54
import { buildSentinelEndId, buildSentinelStartId } from '@/executor/utils/subflow-utils'
65

7-
const logger = createLogger('LoopConstructor')
8-
96
export class LoopConstructor {
107
execute(dag: DAG, reachableBlocks: Set<string>): void {
118
for (const [loopId, loopConfig] of dag.loopConfigs) {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { describe, expect, it } from 'vitest'
5+
import { BlockType } from '@/executor/constants'
6+
import type { DAG } from '@/executor/dag/builder'
7+
import { NodeConstructor } from '@/executor/dag/construction/nodes'
8+
import type { SerializedWorkflow } from '@/serializer/types'
9+
10+
describe('NodeConstructor', () => {
11+
it('assigns nested loop nodes to the innermost loop metadata', () => {
12+
const dag: DAG = {
13+
nodes: new Map(),
14+
loopConfigs: new Map([
15+
['outer-loop', { id: 'outer-loop', nodes: ['inner-loop', 'task'], iterations: 1 }],
16+
['inner-loop', { id: 'inner-loop', nodes: ['task'], iterations: 1 }],
17+
]),
18+
parallelConfigs: new Map(),
19+
}
20+
const workflow: SerializedWorkflow = {
21+
version: '1',
22+
blocks: [
23+
{
24+
id: 'task',
25+
position: { x: 0, y: 0 },
26+
config: { tool: '', params: {} },
27+
inputs: {},
28+
outputs: {},
29+
metadata: { id: BlockType.FUNCTION, name: 'Task' },
30+
enabled: true,
31+
},
32+
],
33+
connections: [],
34+
loops: {},
35+
parallels: {},
36+
}
37+
38+
new NodeConstructor().execute(workflow, dag, new Set(['task']))
39+
40+
expect(dag.nodes.get('task')?.metadata).toMatchObject({
41+
isLoopNode: true,
42+
subflowId: 'inner-loop',
43+
subflowType: 'loop',
44+
})
45+
})
46+
})

0 commit comments

Comments
 (0)