feat: add DAG-based flow model with topological execution#116
Merged
Conversation
Implements Issue #10. Adds DAGFlow and DAGFlowStep as first-class data models alongside the existing linear Flow, with topological level-grouped execution and registration-time cycle detection. Changes: - flow.py: DAGFlowStep (extends FlowStep with step_id, depends_on, step_type, capability_id), DAGFlow, validate_dag_topology() - exceptions.py: DAGDefinitionError with reason/detail attributes - registry.py: AnyFlow = Flow | DAGFlow; topology validated on register - executor.py: dispatch on isinstance(flow, DAGFlow); _execute_dag_flow runs levels in order; sibling key-conflict detection guards determinism - __init__.py: export DAGFlow, DAGFlowStep, DAGDefinitionError - tests: 9 new DAG test classes (112 total), all pass - docs: README error table, AGENTS.md repo map, architecture.md updated step_type and capability_id are forward-compat slots for weaver-spec Invariant I-07 (kernel delegation); only 'tool' is executed today.
There was a problem hiding this comment.
Pull request overview
Adds first-class DAG support to ChainWeaver flows by introducing new DAG data models, validating DAG topology, and executing DAGs level-by-level while preserving existing linear Flow behavior.
Changes:
- Introduces
DAGFlow/DAGFlowStepmodels plusvalidate_dag_topology()for cycle/ID/dep validation. - Extends
FlowRegistryandFlowExecutorto accept and execute DAG flows (with sibling output key conflict detection). - Adds extensive DAG-focused test coverage and updates public docs/API exports.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
chainweaver/flow.py |
Adds DAG models and topology validation helper. |
chainweaver/executor.py |
Adds DAG dispatch + level-by-level DAG execution path. |
chainweaver/registry.py |
Accepts Flow | DAGFlow and validates DAG topology at registration. |
chainweaver/exceptions.py |
Adds DAGDefinitionError. |
chainweaver/__init__.py |
Exports new DAG symbols and error type. |
tests/test_flow_execution.py |
Adds DAG execution tests and keeps linear regression coverage. |
tests/test_registry.py |
Adds DAG registration/validation tests. |
README.md |
Documents new error and updates roadmap items. |
AGENTS.md |
Updates repo map and task guidance for new DAG fields. |
docs/agent-context/architecture.md |
Updates module boundaries/decision context for DAG design. |
- Fix _compute_dag_levels() to use TopologicalSorter order, preventing KeyError when steps are listed in non-dependency order in flow.steps - Add step_type guard in _execute_dag_flow() that rejects non-tool steps with a structured FlowExecutionError (capability reserved for KernelBackedExecutor) - Add Pydantic model_validator on DAGFlowStep: step_type='tool' must have capability_id=None (capability steps left unconstrained for future spec) - Fix DAGFlow docstring: DAGDefinitionError is raised when validate_dag_topology() is invoked, not at model-validation time - Fix _compute_dag_levels() docstring: acknowledge belt-and-suspenders validate_dag_topology() call can raise DAGDefinitionError - Reconcile parallel execution version reference: v0.4 -> v0.2 in architecture.md and flow.py DAGFlow docstring (aligned with README) - Add tests: reverse-ordered steps, capability step rejection, tool-step-with-capability_id rejection, capability-without-id acceptance
- Export validate_dag_topology in __init__.py __all__ (invariant 5) - Update __init__.py docstring to include DAG types in API example - Update executor TODO: replace completed DAG item with parallel/async level execution which is still pending - Restore two removed flow.py TODOs for conditional branching and determinism scoring (still unimplemented) - Add DAG output schema failure test (TestDAGFlowLevelSchemas) - Remove dead code: unused _Doubled/_Tripled/_Sum/_Formatted classes and unused _make_dag_executor stub in tests
README.md: keep both DAGDefinitionError and FlowBuilderError rows in the error table (both added independently on each branch). chainweaver/__init__.py: merge DAG types and validate_dag_topology (this branch) with FlowBuilder/FlowBuilderError (main); update docstring example to show both additions. tests/test_flow_execution.py: use version-portable ZeroDivisionError assertion from main (detail set matching Python <= 3.13 and 3.14+) instead of simple string match.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements Issue #10: Design and implement a DAG-based flow model with topological execution.
DAGFlowandDAGFlowStepare added as first-class data models alongside the existing linearFlow. The executor gains a unified dispatch path —isinstance(flow, DAGFlow)routes to a new level-by-level runner while the linear path is completely unchanged.What changed
chainweaver/flow.pyDAGFlowStep(extendsFlowStepwithstep_id,depends_on,step_type,capability_id),DAGFlow,validate_dag_topology()chainweaver/exceptions.pyDAGDefinitionError(flow_name, reason, detail)— reason ∈{cycle, duplicate_step_id, unknown_dependency}chainweaver/registry.pyAnyFlow = Flow | DAGFlow; topology validated at registration time viavalidate_dag_topologychainweaver/executor.py_execute_dag_flow()runs steps level-by-level (topological sort viagraphlib.TopologicalSorter); sibling key-conflict detection guards output determinism;_execute_stepreused viaFlowStepproxychainweaver/__init__.pyDAGFlow,DAGFlowStep,DAGDefinitionErrortests/test_flow_execution.pytests/test_registry.pyTestDAGFlowRegistration— 9 tests covering valid registration, coexistence with linear flows, duplicate ID, unknown dep, cycle, self-loop, overwrite, get, match-by-intentREADME.mdDAGDefinitionErrorto error table; updated v0.2 roadmap checkboxesAGENTS.mdDAGFlow/DAGFlowSteprow to common-tasks tabledocs/agent-context/architecture.mdstep_type/capability_idrationale, registration-time validation)Why
DAG execution is ChainWeaver's core identity per the weaver-spec ("Deterministic DAG/flow orchestration"). The existing
Flowmodel was linear-only with three# TODO (Phase 2)placeholders inflow.py.The
step_type+capability_idfields are zero-cost forward-compat slots for weaver-spec Invariant I-07 (kernel delegation). Only"tool"is executed today;"capability"is reserved for the futureKernelBackedExecutor.Testing performed
Executor invariants preserved
executor.pyexecutor.pyexecutor.pyDocs and AI-agent instruction updates
README.md— error table + roadmap updatedAGENTS.md— repo map + common-tasks table updateddocs/agent-context/architecture.md— module boundaries and decision context updated.github/copilot-instructions.md— no change needed (routes to AGENTS.md/docs).github/instructions/chainweaver.instructions.md— no change needed.github/instructions/tests.instructions.md— no change neededChecklist
__init__.py__all__updated for all new public symbolsFlowexecution path completely unaffected