fix: workflow incorrectly marked as completed while nodes are still executing#26
Conversation
WalkthroughThis change modifies the skip propagation logic in the graph traversal engine to invoke Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@api/core/workflow/graph_engine/graph_traversal/skip_propagator.py`:
- Around line 83-86: The bug is that in _propagate_skip_to_node the outgoing
edge is propagated before it is marked skipped, causing propagate_skip_from_edge
to see the edge as UNKNOWN and abort; change the order so you call
self._state_manager.mark_edge_skipped(edge.id) before calling
self.propagate_skip_from_edge(edge.id), mirroring the correct ordering used in
skip_branch_paths (mark then propagate) to ensure downstream analyze_edge_states
sees the edge as SKIPPED.
- Around line 60-65: propagate_skip_from_edge currently calls
propagate_skip_from_edge before marking the edge skipped and may call
start_execution/enqueue_node multiple times while printing; fix by first calling
mark_edge_skipped for the edge (use the same edge identifier used in
propagate_skip_from_edge), then call propagate_skip_from_edge; replace the
print(f"Starting execution for node: {downstream_node_id}") with a logger call;
and guard duplicate enqueue by checking the node's ready/enqueued state via
_state_manager before invoking _state_manager.enqueue_node(downstream_node_id)
(or ensure the ready queue deduplicates) so
_state_manager.start_execution(downstream_node_id) and enqueue are only invoked
once per node transition.
🧹 Nitpick comments (1)
api/core/workflow/graph_engine/graph_traversal/skip_propagator.py (1)
61-63: Use logger for execution flow messaging to maintain consistency with codebase conventions.The project consistently uses structured logging throughout
graph_engine/(all related modules followlogger = logging.getLogger(__name__)). Theprint()statement at line 62 is the only instance in the entire directory and bypasses log configuration. Replace withlogger.debug()to match the established pattern.♻️ Suggested change
+import logging + +logger = logging.getLogger(__name__) @@ - print(f"Starting execution for node: {downstream_node_id}") + logger.debug("Starting execution for node: %s", downstream_node_id)
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
api/core/workflow/graph_engine/graph_traversal/skip_propagator.pyapi/tests/unit_tests/core/workflow/graph_engine/graph_traversal/__init__.pyapi/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py
🧰 Additional context used
🧬 Code graph analysis (1)
api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py (2)
api/core/workflow/graph/graph.py (2)
Graph(36-372)get_incoming_edges(364-372)api/core/workflow/graph_engine/graph_traversal/skip_propagator.py (4)
SkipPropagator(14-97)propagate_skip_from_edge(37-69)_propagate_skip_to_node(71-86)skip_branch_paths(88-97)
🔇 Additional comments (4)
api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/__init__.py (1)
1-1: Nice touch adding the package docstring.
Keeps the test package self-describing.api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py (3)
1-114: Solid coverage for unknown/taken/all-skipped edge states.
The tests exercise the core branch conditions well and validate key state-manager interactions.
115-185: Good coverage for skip propagation and branch skipping.
These tests capture the expected calls for node/edge skipping and propagation paths.
186-307: Recursive and mixed-state scenarios are well exercised.
Nice to see multi-step propagation and state reset behavior covered.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| if edge_states["has_taken"]: | ||
| # Enqueue node | ||
| # Start execution and enqueue node | ||
| print(f"Starting execution for node: {downstream_node_id}") | ||
| self._state_manager.start_execution(downstream_node_id) | ||
| self._state_manager.enqueue_node(downstream_node_id) | ||
| return |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n -P 'class GraphStateManager\b' -C 2Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 222
🏁 Script executed:
rg -n -P 'def (start_execution|enqueue_node|analyze_edge_states)\b' -C 3Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 222
🏁 Script executed:
rg -n -P 'def (is_.*enqueued|is_.*executing|has_.*execution)\b' -C 3Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 676
🏁 Script executed:
# Also find the skip_propagator file and examine the full context
find . -name "skip_propagator.py" -type fRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 150
🏁 Script executed:
cat -n ./api/core/workflow/graph_engine/graph_traversal/skip_propagator.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 3859
🏁 Script executed:
cat -n ./api/core/workflow/graph_engine/graph_state_manager.py | head -100Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 3585
🏁 Script executed:
rg -n -A 20 "def start_execution\b" ./api/core/workflow/graph_engine/graph_state_manager.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 748
🏁 Script executed:
rg -n -A 20 "def enqueue_node\b" ./api/core/workflow/graph_engine/graph_state_manager.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 793
Fix edge-state ordering and guard against duplicate node start/enqueue calls.
Line 85–86: propagate_skip_from_edge is called before mark_edge_skipped, leaving the edge in UNKNOWN state. When the downstream node analyzes its incoming edges, it encounters this UNKNOWN edge and returns early (line 56–57), blocking skip propagation. Reverse the order: mark the edge as skipped first.
Line 60–65: If a node has multiple incoming edges and propagate_skip_from_edge is called for each, the condition has_taken=True will trigger start_execution + enqueue_node multiple times. Although start_execution uses an idempotent set, enqueue_node calls _ready_queue.put() without deduplication, risking duplicate queue entries. Either guard by checking node state before enqueue, or verify the queue deduplicates on put.
Line 62: Replace print() with logging.
🤖 Prompt for AI Agents
In `@api/core/workflow/graph_engine/graph_traversal/skip_propagator.py` around
lines 60 - 65, propagate_skip_from_edge currently calls propagate_skip_from_edge
before marking the edge skipped and may call start_execution/enqueue_node
multiple times while printing; fix by first calling mark_edge_skipped for the
edge (use the same edge identifier used in propagate_skip_from_edge), then call
propagate_skip_from_edge; replace the print(f"Starting execution for node:
{downstream_node_id}") with a logger call; and guard duplicate enqueue by
checking the node's ready/enqueued state via _state_manager before invoking
_state_manager.enqueue_node(downstream_node_id) (or ensure the ready queue
deduplicates) so _state_manager.start_execution(downstream_node_id) and enqueue
are only invoked once per node transition.
| for edge in outgoing_edges: | ||
| self._state_manager.mark_edge_skipped(edge.id) | ||
| # Recursively propagate skip | ||
| self.propagate_skip_from_edge(edge.id) | ||
| self._state_manager.mark_edge_skipped(edge.id) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's find and examine the file and the specific lines
find . -name "skip_propagator.py" -type f | head -5Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 222
🏁 Script executed:
#!/bin/bash
# Read the file to understand the context around lines 83-86
cat -n api/core/workflow/graph_engine/graph_traversal/skip_propagator.py | sed -n '70,100p'Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1268
🏁 Script executed:
#!/bin/bash
# Search for the propagate_skip_from_edge method implementation
rg -n "def propagate_skip_from_edge" -A 30 api/core/workflow/graph_engine/graph_traversal/skip_propagator.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1418
🏁 Script executed:
#!/bin/bash
# Search for analyze_edge_states to understand how it treats unset edge states
rg -n "def analyze_edge_states" -A 20Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1972
🏁 Script executed:
#!/bin/bash
# Check how mark_edge_skipped works
rg -n "def mark_edge_skipped" -A 10Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1061
Mark edge skipped before recursing into downstream propagation.
In _propagate_skip_to_node, calling propagate_skip_from_edge before mark_edge_skipped causes the edge to remain in UNKNOWN state during downstream analysis. When propagate_skip_from_edge retrieves the downstream node's incoming edges and calls analyze_edge_states, it detects the current edge as UNKNOWN, triggering an early return (line 56-57 in propagate_skip_from_edge). This blocks deeper skip propagation—a correctness regression.
The correct pattern is already implemented in skip_branch_paths (lines 95-97), where the edge is marked before propagation. Apply the same order here.
Suggested fix
for edge in outgoing_edges:
- # Recursively propagate skip
- self.propagate_skip_from_edge(edge.id)
- self._state_manager.mark_edge_skipped(edge.id)
+ self._state_manager.mark_edge_skipped(edge.id)
+ # Recursively propagate skip
+ self.propagate_skip_from_edge(edge.id)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for edge in outgoing_edges: | |
| self._state_manager.mark_edge_skipped(edge.id) | |
| # Recursively propagate skip | |
| self.propagate_skip_from_edge(edge.id) | |
| self._state_manager.mark_edge_skipped(edge.id) | |
| for edge in outgoing_edges: | |
| self._state_manager.mark_edge_skipped(edge.id) | |
| # Recursively propagate skip | |
| self.propagate_skip_from_edge(edge.id) |
🤖 Prompt for AI Agents
In `@api/core/workflow/graph_engine/graph_traversal/skip_propagator.py` around
lines 83 - 86, The bug is that in _propagate_skip_to_node the outgoing edge is
propagated before it is marked skipped, causing propagate_skip_from_edge to see
the edge as UNKNOWN and abort; change the order so you call
self._state_manager.mark_edge_skipped(edge.id) before calling
self.propagate_skip_from_edge(edge.id), mirroring the correct ordering used in
skip_branch_paths (mark then propagate) to ensure downstream analyze_edge_states
sees the edge as SKIPPED.
Benchmark PR from qodo-benchmark#441
Summary by CodeRabbit
Release Notes
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.