From 24b45953e65cee76f49b30ed0603076adb4f6339 Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Tue, 27 Jan 2026 21:34:25 -0800 Subject: [PATCH 1/3] Add unit test Signed-off-by: Jason Parraga --- .../integration/remote/test_remote.py | 8 ++++ .../workflows/basic/conditional_workflow.py | 39 +++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 tests/flytekit/integration/remote/workflows/basic/conditional_workflow.py diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 25384b6eec..52f9c4a217 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -1358,3 +1358,11 @@ def test_run_wf_with_resource_requests_override(register): ], limits=[], ) + +def test_conditional_workflow(): + execution_id = run("conditional_workflow.py", "wf") + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + execution = remote.fetch_execution(name=execution_id) + execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) + print("Execution Error:", execution.error) + assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" diff --git a/tests/flytekit/integration/remote/workflows/basic/conditional_workflow.py b/tests/flytekit/integration/remote/workflows/basic/conditional_workflow.py new file mode 100644 index 0000000000..d052a5fc65 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/conditional_workflow.py @@ -0,0 +1,39 @@ +import flytekit as fl +from flytekit import conditional +from flytekit.core.task import Echo + +echo_radius = Echo(name="noop", inputs={"radius": float}) + + +@fl.task +def calculate_circle_circumference(radius: float) -> float: + return 2 * 3.14 * radius # Task to calculate the circumference of a circle + + +@fl.task +def calculate_circle_area(radius: float) -> float: + return 3.14 * radius * radius # Task to calculate the area of a circle + + +@fl.task +def nop(radius: float) -> float: + return radius # Task that does nothing, effectively a no-op + + +@fl.workflow +def wf(radius: float = 0.5, get_area: bool = False, get_circumference: bool = True): + echoed_radius = nop(radius=radius) + ( + conditional("if_area") + .if_(get_area.is_true()) + .then(calculate_circle_area(radius=radius)) + .else_() + .then(echo_radius(echoed_radius)) + ) + ( + conditional("if_circumference") + .if_(get_circumference.is_true()) + .then(calculate_circle_circumference(radius=echoed_radius)) + .else_() + .then(echo_radius(echoed_radius)) + ) From 4a9aca3423e11d558606af3c31a49ff73ab6d63d Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Tue, 27 Jan 2026 22:29:04 -0800 Subject: [PATCH 2/3] move fix Signed-off-by: Jason Parraga --- flytekit/remote/remote.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 1af159108a..09b3d1ee8c 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -2763,6 +2763,34 @@ def sync_node_execution( logger.info("Skipping gate node execution for now - gate nodes don't have inputs and outputs filled in") return execution + # Handle the case where it's a branch node + elif execution._node.branch_node is not None: + # We'll need to query child node executions regardless since this is a parent node + child_node_executions = iterate_node_executions( + self.client, + workflow_execution_identifier=execution.id.execution_id, + unique_parent_id=execution.id.node_id, + ) + child_node_executions = [x for x in child_node_executions] + + sub_flyte_workflow = typing.cast(FlyteBranchNode, execution._node.flyte_entity) + sub_node_mapping = {} + if sub_flyte_workflow.if_else.case.then_node: + then_node = sub_flyte_workflow.if_else.case.then_node + sub_node_mapping[then_node.id] = then_node + if sub_flyte_workflow.if_else.other: + for case in sub_flyte_workflow.if_else.other: + then_node = case.then_node + sub_node_mapping[then_node.id] = then_node + if sub_flyte_workflow.if_else.else_node: + else_node = sub_flyte_workflow.if_else.else_node + sub_node_mapping[else_node.id] = else_node + + execution._underlying_node_executions = [ + self.sync_node_execution(FlyteNodeExecution.promote_from_model(cne), sub_node_mapping) + for cne in child_node_executions + ] + # This is the plain ol' task execution case else: execution._task_executions = [ From bbb855bac8f6e06a273793eb4bb66186cd447d8b Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Wed, 28 Jan 2026 12:36:01 -0800 Subject: [PATCH 3/3] Remove print Signed-off-by: Jason Parraga --- tests/flytekit/integration/remote/test_remote.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 52f9c4a217..5ea3e9e2a2 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -1364,5 +1364,4 @@ def test_conditional_workflow(): remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) execution = remote.fetch_execution(name=execution_id) execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) - print("Execution Error:", execution.error) assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"