Skip to content

Commit ebdaa84

Browse files
committed
improvement(subflows): orchestration consolidation
1 parent bdf9ffc commit ebdaa84

27 files changed

Lines changed: 982 additions & 291 deletions

apps/sim/executor/constants.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ export const EDGE = {
7474
DEFAULT: 'default',
7575
} as const
7676

77+
export const SUBFLOW_CONTROL_EDGE_HANDLES = new Set<string>([
78+
EDGE.LOOP_CONTINUE,
79+
EDGE.LOOP_CONTINUE_ALT,
80+
EDGE.LOOP_EXIT,
81+
EDGE.PARALLEL_CONTINUE,
82+
EDGE.PARALLEL_EXIT,
83+
])
84+
85+
export const CONTROL_BACK_EDGE_HANDLES = new Set<string>([
86+
EDGE.LOOP_CONTINUE,
87+
EDGE.LOOP_CONTINUE_ALT,
88+
EDGE.PARALLEL_CONTINUE,
89+
])
90+
7791
export const LOOP = {
7892
TYPE: {
7993
FOR: 'for' as LoopType,

apps/sim/executor/dag/construction/edges.test.ts

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,16 @@ describe('EdgeConstructor', () => {
925925
expect(edgesToRegular.length).toBe(1)
926926
expect(edgesToRegular[0].sourceHandle).toBe('parallel_exit')
927927

928+
const edgesToParallelStart = Array.from(parallelEndNode.outgoingEdges.values()).filter(
929+
(e) => e.target === parallelSentinelStart
930+
)
931+
expect(edgesToParallelStart.length).toBe(1)
932+
expect(edgesToParallelStart[0].sourceHandle).toBe('parallel_continue')
933+
expect(edgesToParallelStart[0].isActive).toBe(false)
934+
935+
const parallelStartNode = dag.nodes.get(parallelSentinelStart)!
936+
expect(parallelStartNode.incomingEdges.has(parallelSentinelEnd)).toBe(false)
937+
928938
const regularBlockNode = dag.nodes.get(regularBlockId)!
929939
expect(regularBlockNode.incomingEdges.has(parallelSentinelEnd)).toBe(true)
930940
})
@@ -1356,31 +1366,37 @@ describe('EdgeConstructor', () => {
13561366
// Set up sentinel metadata
13571367
dag.nodes.get(outerSentinelStart)!.metadata = {
13581368
isSentinel: true,
1359-
isParallelSentinel: true,
13601369
sentinelType: 'start',
13611370
parallelId: outerParallelId,
1371+
subflowId: outerParallelId,
1372+
subflowType: 'parallel',
13621373
}
13631374
dag.nodes.get(outerSentinelEnd)!.metadata = {
13641375
isSentinel: true,
1365-
isParallelSentinel: true,
13661376
sentinelType: 'end',
13671377
parallelId: outerParallelId,
1378+
subflowId: outerParallelId,
1379+
subflowType: 'parallel',
13681380
}
13691381
dag.nodes.get(innerSentinelStart)!.metadata = {
13701382
isSentinel: true,
1371-
isParallelSentinel: true,
13721383
sentinelType: 'start',
13731384
parallelId: innerParallelId,
1385+
subflowId: innerParallelId,
1386+
subflowType: 'parallel',
13741387
}
13751388
dag.nodes.get(innerSentinelEnd)!.metadata = {
13761389
isSentinel: true,
1377-
isParallelSentinel: true,
13781390
sentinelType: 'end',
13791391
parallelId: innerParallelId,
1392+
subflowId: innerParallelId,
1393+
subflowType: 'parallel',
13801394
}
13811395
dag.nodes.get(funcTemplate)!.metadata = {
13821396
isParallelBranch: true,
13831397
parallelId: innerParallelId,
1398+
subflowId: innerParallelId,
1399+
subflowType: 'parallel',
13841400
branchIndex: 0,
13851401
branchTotal: 1,
13861402
originalBlockId: functionId,
@@ -1479,27 +1495,35 @@ describe('EdgeConstructor', () => {
14791495
isSentinel: true,
14801496
sentinelType: 'start',
14811497
loopId,
1498+
subflowId: loopId,
1499+
subflowType: 'loop',
14821500
}
14831501
dag.nodes.get(loopSentinelEnd)!.metadata = {
14841502
isSentinel: true,
14851503
sentinelType: 'end',
14861504
loopId,
1505+
subflowId: loopId,
1506+
subflowType: 'loop',
14871507
}
14881508
dag.nodes.get(parallelSentinelStart)!.metadata = {
14891509
isSentinel: true,
1490-
isParallelSentinel: true,
14911510
sentinelType: 'start',
14921511
parallelId: innerParallelId,
1512+
subflowId: innerParallelId,
1513+
subflowType: 'parallel',
14931514
}
14941515
dag.nodes.get(parallelSentinelEnd)!.metadata = {
14951516
isSentinel: true,
1496-
isParallelSentinel: true,
14971517
sentinelType: 'end',
14981518
parallelId: innerParallelId,
1519+
subflowId: innerParallelId,
1520+
subflowType: 'parallel',
14991521
}
15001522
dag.nodes.get(funcTemplate)!.metadata = {
15011523
isParallelBranch: true,
15021524
parallelId: innerParallelId,
1525+
subflowId: innerParallelId,
1526+
subflowType: 'parallel',
15031527
branchIndex: 0,
15041528
branchTotal: 1,
15051529
originalBlockId: functionId,
@@ -1592,25 +1616,31 @@ describe('EdgeConstructor', () => {
15921616

15931617
dag.nodes.get(outerSentinelStart)!.metadata = {
15941618
isSentinel: true,
1595-
isParallelSentinel: true,
15961619
sentinelType: 'start',
15971620
parallelId: outerParallelId,
1621+
subflowId: outerParallelId,
1622+
subflowType: 'parallel',
15981623
}
15991624
dag.nodes.get(outerSentinelEnd)!.metadata = {
16001625
isSentinel: true,
1601-
isParallelSentinel: true,
16021626
sentinelType: 'end',
16031627
parallelId: outerParallelId,
1628+
subflowId: outerParallelId,
1629+
subflowType: 'parallel',
16041630
}
16051631
dag.nodes.get(innerSentinelStart)!.metadata = {
16061632
isSentinel: true,
16071633
sentinelType: 'start',
16081634
loopId: innerLoopId,
1635+
subflowId: innerLoopId,
1636+
subflowType: 'loop',
16091637
}
16101638
dag.nodes.get(innerSentinelEnd)!.metadata = {
16111639
isSentinel: true,
16121640
sentinelType: 'end',
16131641
loopId: innerLoopId,
1642+
subflowId: innerLoopId,
1643+
subflowType: 'loop',
16141644
}
16151645

16161646
const innerLoop: SerializedLoop = {

apps/sim/executor/dag/construction/edges.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
33
import {
4+
CONTROL_BACK_EDGE_HANDLES,
45
EDGE,
56
isConditionBlockType,
67
isRouterBlockType,
@@ -366,6 +367,8 @@ export class EdgeConstructor {
366367
this.addEdge(dag, sourceId, sentinelEndId, handle)
367368
}
368369
}
370+
371+
this.addEdge(dag, sentinelEndId, sentinelStartId, EDGE.PARALLEL_CONTINUE, undefined, true)
369372
}
370373
}
371374

@@ -566,9 +569,7 @@ export class EdgeConstructor {
566569

567570
let hasOutgoingToLoop = false
568571
for (const [, edge] of node.outgoingEdges) {
569-
const isBackEdge =
570-
edge.sourceHandle === EDGE.LOOP_CONTINUE || edge.sourceHandle === EDGE.LOOP_CONTINUE_ALT
571-
if (isBackEdge) continue
572+
if (this.isControlBackEdge(edge.sourceHandle)) continue
572573

573574
if (effectiveNodeSet.has(edge.target)) {
574575
hasOutgoingToLoop = true
@@ -620,10 +621,7 @@ export class EdgeConstructor {
620621
if (endNode) {
621622
let hasOutgoingToParallel = false
622623
for (const [, edge] of endNode.outgoingEdges) {
623-
// Skip loop back-edges — they don't count as forward edges within the parallel
624-
const isBackEdge =
625-
edge.sourceHandle === EDGE.LOOP_CONTINUE || edge.sourceHandle === EDGE.LOOP_CONTINUE_ALT
626-
if (isBackEdge) continue
624+
if (this.isControlBackEdge(edge.sourceHandle)) continue
627625

628626
const originalTargetId = normalizeNodeId(edge.target)
629627
if (nodesSet.has(originalTargetId)) {
@@ -667,6 +665,10 @@ export class EdgeConstructor {
667665
return { startNode: templateNode, endNode: templateNode }
668666
}
669667

668+
private isControlBackEdge(sourceHandle?: string): boolean {
669+
return sourceHandle !== undefined && CONTROL_BACK_EDGE_HANDLES.has(sourceHandle)
670+
}
671+
670672
private getParallelId(blockId: string, dag: DAG): string | null {
671673
for (const [parallelId, parallelConfig] of dag.parallelConfigs) {
672674
if (parallelConfig.nodes.includes(blockId)) {
@@ -682,7 +684,7 @@ export class EdgeConstructor {
682684
targetId: string,
683685
sourceHandle?: string,
684686
targetHandle?: string,
685-
isLoopBackEdge = false
687+
isControlBackEdge = false
686688
): void {
687689
const sourceNode = dag.nodes.get(sourceId)
688690
const targetNode = dag.nodes.get(targetId)
@@ -698,10 +700,10 @@ export class EdgeConstructor {
698700
target: targetId,
699701
sourceHandle,
700702
targetHandle,
701-
isActive: isLoopBackEdge ? false : undefined,
703+
isActive: isControlBackEdge ? false : undefined,
702704
})
703705

704-
if (!isLoopBackEdge) {
706+
if (!isControlBackEdge) {
705707
targetNode.incomingEdges.add(sourceId)
706708
}
707709
}
Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
2-
import { BlockType, LOOP, type SentinelType } from '@/executor/constants'
3-
import type { DAG, DAGNode } from '@/executor/dag/builder'
2+
import { BlockType, LOOP } from '@/executor/constants'
3+
import type { DAG } from '@/executor/dag/builder'
4+
import { createSubflowSentinelNode } from '@/executor/dag/construction/sentinels'
45
import { buildSentinelEndId, buildSentinelStartId } from '@/executor/utils/subflow-utils'
56

67
const logger = createLogger('LoopConstructor')
@@ -29,9 +30,10 @@ export class LoopConstructor {
2930

3031
dag.nodes.set(
3132
startId,
32-
this.createSentinelNode({
33+
createSubflowSentinelNode({
3334
id: startId,
34-
loopId,
35+
subflowId: loopId,
36+
subflowType: 'loop',
3537
sentinelType: LOOP.SENTINEL.START_TYPE,
3638
blockType: BlockType.SENTINEL_START,
3739
name: `${LOOP.SENTINEL.START_NAME_PREFIX} (${loopId})`,
@@ -40,42 +42,14 @@ export class LoopConstructor {
4042

4143
dag.nodes.set(
4244
endId,
43-
this.createSentinelNode({
45+
createSubflowSentinelNode({
4446
id: endId,
45-
loopId,
47+
subflowId: loopId,
48+
subflowType: 'loop',
4649
sentinelType: LOOP.SENTINEL.END_TYPE,
4750
blockType: BlockType.SENTINEL_END,
4851
name: `${LOOP.SENTINEL.END_NAME_PREFIX} (${loopId})`,
4952
})
5053
)
5154
}
52-
53-
private createSentinelNode(config: {
54-
id: string
55-
loopId: string
56-
sentinelType: SentinelType
57-
blockType: BlockType
58-
name: string
59-
}): DAGNode {
60-
return {
61-
id: config.id,
62-
block: {
63-
id: config.id,
64-
enabled: true,
65-
metadata: {
66-
id: config.blockType,
67-
name: config.name,
68-
loopId: config.loopId,
69-
},
70-
config: { params: {} },
71-
} as any,
72-
incomingEdges: new Set(),
73-
outgoingEdges: new Map(),
74-
metadata: {
75-
isSentinel: true,
76-
sentinelType: config.sentinelType,
77-
loopId: config.loopId,
78-
},
79-
}
80-
}
8155
}

apps/sim/executor/dag/construction/nodes.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ export class NodeConstructor {
105105
metadata: {
106106
isParallelBranch: true,
107107
parallelId,
108+
subflowId: parallelId,
109+
subflowType: 'parallel',
108110
branchIndex: 0,
109111
branchTotal: 1,
110112
isPauseResponse: block.metadata?.id === BlockType.HUMAN_IN_THE_LOOP,
@@ -130,6 +132,7 @@ export class NodeConstructor {
130132
metadata: {
131133
isLoopNode,
132134
loopId,
135+
...(loopId && { subflowId: loopId, subflowType: 'loop' as const }),
133136
isPauseResponse: isPauseBlock,
134137
originalBlockId: block.id,
135138
},
Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
2-
import { BlockType, PARALLEL, type SentinelType } from '@/executor/constants'
3-
import type { DAG, DAGNode } from '@/executor/dag/builder'
2+
import { BlockType, PARALLEL } from '@/executor/constants'
3+
import type { DAG } from '@/executor/dag/builder'
4+
import { createSubflowSentinelNode } from '@/executor/dag/construction/sentinels'
45
import {
56
buildParallelSentinelEndId,
67
buildParallelSentinelStartId,
@@ -32,9 +33,10 @@ export class ParallelConstructor {
3233

3334
dag.nodes.set(
3435
startId,
35-
this.createSentinelNode({
36+
createSubflowSentinelNode({
3637
id: startId,
37-
parallelId,
38+
subflowId: parallelId,
39+
subflowType: 'parallel',
3840
sentinelType: PARALLEL.SENTINEL.START_TYPE,
3941
blockType: BlockType.SENTINEL_START,
4042
name: `${PARALLEL.SENTINEL.START_NAME_PREFIX} (${parallelId})`,
@@ -43,43 +45,14 @@ export class ParallelConstructor {
4345

4446
dag.nodes.set(
4547
endId,
46-
this.createSentinelNode({
48+
createSubflowSentinelNode({
4749
id: endId,
48-
parallelId,
50+
subflowId: parallelId,
51+
subflowType: 'parallel',
4952
sentinelType: PARALLEL.SENTINEL.END_TYPE,
5053
blockType: BlockType.SENTINEL_END,
5154
name: `${PARALLEL.SENTINEL.END_NAME_PREFIX} (${parallelId})`,
5255
})
5356
)
5457
}
55-
56-
private createSentinelNode(config: {
57-
id: string
58-
parallelId: string
59-
sentinelType: SentinelType
60-
blockType: BlockType
61-
name: string
62-
}): DAGNode {
63-
return {
64-
id: config.id,
65-
block: {
66-
id: config.id,
67-
enabled: true,
68-
metadata: {
69-
id: config.blockType,
70-
name: config.name,
71-
parallelId: config.parallelId,
72-
},
73-
config: { params: {} },
74-
} as any,
75-
incomingEdges: new Set(),
76-
outgoingEdges: new Map(),
77-
metadata: {
78-
isSentinel: true,
79-
isParallelSentinel: true,
80-
sentinelType: config.sentinelType,
81-
parallelId: config.parallelId,
82-
},
83-
}
84-
}
8558
}

0 commit comments

Comments
 (0)