From 24dc3b90e1040b09e3e9b54cf428aac008053e7a Mon Sep 17 00:00:00 2001 From: Diogo Andre Santos Date: Mon, 13 Apr 2026 06:23:48 +0100 Subject: [PATCH 1/3] feat: add DAG-based flow model with topological execution Implements Issue #10. Adds DAGFlow and DAGFlowStep as first-class data models alongside the existing linear Flow, with topological level-grouped execution and registration-time cycle detection. Changes: - flow.py: DAGFlowStep (extends FlowStep with step_id, depends_on, step_type, capability_id), DAGFlow, validate_dag_topology() - exceptions.py: DAGDefinitionError with reason/detail attributes - registry.py: AnyFlow = Flow | DAGFlow; topology validated on register - executor.py: dispatch on isinstance(flow, DAGFlow); _execute_dag_flow runs levels in order; sibling key-conflict detection guards determinism - __init__.py: export DAGFlow, DAGFlowStep, DAGDefinitionError - tests: 9 new DAG test classes (112 total), all pass - docs: README error table, AGENTS.md repo map, architecture.md updated step_type and capability_id are forward-compat slots for weaver-spec Invariant I-07 (kernel delegation); only 'tool' is executed today. --- AGENTS.md | 7 +- README.md | 7 +- chainweaver/__init__.py | 6 +- chainweaver/exceptions.py | 22 ++ chainweaver/executor.py | 199 +++++++++- chainweaver/flow.py | 170 +++++++- chainweaver/registry.py | 47 ++- docs/agent-context/architecture.md | 11 +- tests/test_flow_execution.py | 608 ++++++++++++++++++++++++++++- tests/test_registry.py | 105 ++++- 10 files changed, 1148 insertions(+), 34 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 46efdd0..92c965c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -36,9 +36,9 @@ chainweaver/ ├── __init__.py Public API surface; all exports in __all__ ├── decorators.py @tool decorator for zero-boilerplate tool definition ├── tools.py Tool class: named callable with Pydantic I/O schemas -├── flow.py FlowStep + Flow: ordered step definitions (Pydantic models) -├── registry.py FlowRegistry: in-memory catalogue of named flows -├── executor.py FlowExecutor: sequential, LLM-free runner (main entry point) +├── flow.py FlowStep + Flow (linear) + DAGFlowStep + DAGFlow + validate_dag_topology +├── registry.py FlowRegistry: in-memory catalogue of Flow and DAGFlow +├── executor.py FlowExecutor: sequential/DAG runner (main entry point) ├── exceptions.py Typed exception hierarchy (all inherit ChainWeaverError) ├── log_utils.py Structured per-step logging utilities └── py.typed PEP 561 marker @@ -131,6 +131,7 @@ For the full prohibited-actions list and anti-patterns, see | Add a new exception | `exceptions.py` | `__init__.py` + `__all__` + README error table — **same PR** | | Modify flow execution | `executor.py` | Keep `StepRecord` + `ExecutionResult` consistent | | Add a new Flow field | `flow.py` | Serialization tests if `model_dump()` changes | +| Add a new DAGFlow / DAGFlowStep field | `flow.py` | Update `validate_dag_topology` if needed; update tests | | Change logging format | `log_utils.py` | Update tests (no re-export needed) | | Add a new module | See [new-module checklist](docs/agent-context/workflows.md#new-module-checklist) | diff --git a/README.md b/README.md index cba5674..12260ef 100644 --- a/README.md +++ b/README.md @@ -405,6 +405,7 @@ All errors are typed and traceable: | `InputMappingError` | A mapping key is not present in the context | | `FlowExecutionError` | The tool callable raises an unexpected exception | | `ToolDefinitionError` | The `@tool` decorator cannot build a tool from a function | +| `DAGDefinitionError` | A `DAGFlow` has a cycle, duplicate `step_id`, or unknown dependency | All exceptions inherit from `ChainWeaverError`. @@ -424,8 +425,10 @@ All exceptions inherit from `ChainWeaverError`. ### v0.2 — DAG & Branching -- [ ] DAG-based execution with dependency edges -- [ ] Parallel step groups +- [x] DAG-based execution with dependency edges (`DAGFlow`, `DAGFlowStep`) +- [x] Topological level-grouped execution with sibling key-conflict detection +- [x] `DAGDefinitionError` — cycle / duplicate ID / unknown dep detected at registration +- [ ] Actual parallel/async execution for independent levels - [ ] Conditional branching inside flows ### v0.3 — Persistence & Learning diff --git a/chainweaver/__init__.py b/chainweaver/__init__.py index f55a0c5..ac27131 100644 --- a/chainweaver/__init__.py +++ b/chainweaver/__init__.py @@ -25,6 +25,7 @@ from chainweaver.decorators import tool from chainweaver.exceptions import ( ChainWeaverError, + DAGDefinitionError, FlowAlreadyExistsError, FlowExecutionError, FlowNotFoundError, @@ -34,7 +35,7 @@ ToolNotFoundError, ) from chainweaver.executor import ExecutionResult, FlowExecutor, StepRecord -from chainweaver.flow import Flow, FlowStep +from chainweaver.flow import DAGFlow, DAGFlowStep, Flow, FlowStep from chainweaver.registry import FlowRegistry from chainweaver.tools import Tool @@ -46,6 +47,9 @@ __all__ = [ "ChainWeaverError", + "DAGDefinitionError", + "DAGFlow", + "DAGFlowStep", "ExecutionResult", "Flow", "FlowAlreadyExistsError", diff --git a/chainweaver/exceptions.py b/chainweaver/exceptions.py index 3a73c21..c56cb51 100644 --- a/chainweaver/exceptions.py +++ b/chainweaver/exceptions.py @@ -80,3 +80,25 @@ def __init__(self, function_name: str, detail: str) -> None: self.function_name = function_name self.detail = detail super().__init__(f"Cannot define tool from function '{function_name}': {detail}") + + +class DAGDefinitionError(ChainWeaverError): + """Raised when a :class:`~chainweaver.flow.DAGFlow` definition is invalid. + + Attributes: + flow_name: Name of the flow that failed validation. + reason: Machine-readable reason code. One of ``"cycle"``, + ``"duplicate_step_id"``, or ``"unknown_dependency"``. + detail: Human-readable explanation. + """ + + def __init__( + self, + flow_name: str, + reason: str, + detail: str, + ) -> None: + self.flow_name = flow_name + self.reason = reason + self.detail = detail + super().__init__(f"Invalid DAG flow '{flow_name}' ({reason}): {detail}") diff --git a/chainweaver/executor.py b/chainweaver/executor.py index 235864e..d1b2485 100644 --- a/chainweaver/executor.py +++ b/chainweaver/executor.py @@ -18,7 +18,7 @@ SchemaValidationError, ToolNotFoundError, ) -from chainweaver.flow import FlowStep +from chainweaver.flow import DAGFlow, DAGFlowStep, FlowStep, validate_dag_topology from chainweaver.log_utils import get_logger, log_step_end, log_step_error, log_step_start from chainweaver.registry import FlowRegistry from chainweaver.tools import Tool @@ -159,6 +159,9 @@ def execute_flow( FlowNotFoundError: When *flow_name* is not registered. """ flow = self._registry.get_flow(flow_name) + if isinstance(flow, DAGFlow): + return self._execute_dag_flow(flow, initial_input) + _logger.info("Flow '%s' started | steps=%d", flow_name, len(flow.steps)) # -- Flow-level input validation ------------------------------------ @@ -362,3 +365,197 @@ def _execute_step( outputs=outputs, success=True, ) + + # ------------------------------------------------------------------ + # DAG execution + # ------------------------------------------------------------------ + + def _compute_dag_levels(self, flow: DAGFlow) -> list[list[DAGFlowStep]]: + """Return steps grouped into topological execution levels. + + Within each level all steps are independent (no inter-level edges). + Steps in the same level can conceptually run in parallel; today they + run sequentially in list order. + + The topology is already validated at registration time, so this + method cannot raise :class:`~chainweaver.exceptions.DAGDefinitionError`. + The ``validate_dag_topology`` call here is a belt-and-suspenders guard + for flows that are created and executed without going through + :class:`~chainweaver.registry.FlowRegistry`. + + Args: + flow: A valid :class:`~chainweaver.flow.DAGFlow`. + + Returns: + A list of levels, each level being a list of + :class:`~chainweaver.flow.DAGFlowStep` objects. + """ + validate_dag_topology(flow) + step_by_id = {s.step_id: s for s in flow.steps} + # level[step_id] = 0-based level index + levels: dict[str, int] = {} + for step in flow.steps: + if not step.depends_on: + levels[step.step_id] = 0 + else: + levels[step.step_id] = max(levels[dep] for dep in step.depends_on) + 1 + + max_level = max(levels.values(), default=-1) + grouped: list[list[DAGFlowStep]] = [[] for _ in range(max_level + 1)] + for step in flow.steps: + grouped[levels[step.step_id]].append(step_by_id[step.step_id]) + return grouped + + def _execute_dag_flow( + self, + flow: DAGFlow, + initial_input: dict[str, Any], + ) -> ExecutionResult: + """Execute a :class:`~chainweaver.flow.DAGFlow`. + + Steps are executed level-by-level in topological order. Within each + level steps run sequentially. Outputs from all steps in a level are + collected and merged into the shared context before the next level + starts. If two sibling steps (same level) produce the same output + key a :class:`~chainweaver.exceptions.FlowExecutionError` is raised + immediately to preserve determinism. + + Args: + flow: The :class:`~chainweaver.flow.DAGFlow` to execute. + initial_input: Initial key/value context. + + Returns: + An :class:`ExecutionResult` with the full execution log. + """ + _logger.info("DAGFlow '%s' started | steps=%d", flow.name, len(flow.steps)) + + # -- Flow-level input validation ------------------------------------ + if flow.input_schema is not None: + try: + flow.input_schema.model_validate(initial_input) + except ValidationError as exc: + wrapped = SchemaValidationError(flow.name, -1, str(exc), context="flow_input") + _logger.error("DAGFlow '%s' input validation failed: %s", flow.name, wrapped) + return ExecutionResult( + flow_name=flow.name, + success=False, + final_output=None, + execution_log=[ + StepRecord( + step_index=-1, + tool_name=flow.name, + inputs=dict(initial_input), + error=wrapped, + success=False, + ) + ], + ) + + context: dict[str, Any] = dict(initial_input) + log: list[StepRecord] = [] + levels = self._compute_dag_levels(flow) + # Flat index for StepRecord.step_index (mirrors linear flow behaviour). + flat_index = 0 + + for level_steps in levels: + level_outputs: dict[str, Any] = {} + level_records: list[StepRecord] = [] + + for step in level_steps: + # Build a lightweight FlowStep-compatible view so _execute_step + # can be reused without modification. + proxy = FlowStep( + tool_name=step.tool_name, + input_mapping=step.input_mapping, + ) + record = self._execute_step(flat_index, proxy, context) + level_records.append(record) + flat_index += 1 + + if not record.success: + log.extend(level_records) + _logger.error( + "DAGFlow '%s' aborted at step %d (%s)", + flow.name, + record.step_index, + step.tool_name, + ) + return ExecutionResult( + flow_name=flow.name, + success=False, + final_output=None, + execution_log=log, + ) + + assert record.outputs is not None # success guarantees outputs + # Detect sibling key conflicts to preserve determinism. + for key, value in record.outputs.items(): + if key in level_outputs: + conflict_err = FlowExecutionError( + step.tool_name, + record.step_index, + f"Key '{key}' produced by both '{step.tool_name}' and a " + f"sibling step in the same DAG level. " + f"Use distinct output keys or sequential steps.", + ) + record_conflict = StepRecord( + step_index=record.step_index, + tool_name=step.tool_name, + inputs=record.inputs, + error=conflict_err, + success=False, + ) + log.extend(level_records[:-1]) + log.append(record_conflict) + _logger.error("DAGFlow '%s': sibling key conflict on '%s'", flow.name, key) + return ExecutionResult( + flow_name=flow.name, + success=False, + final_output=None, + execution_log=log, + ) + level_outputs[key] = value + + log.extend(level_records) + # Merge all level outputs into context after the level completes. + for key in level_outputs: + if key in context: + _logger.debug( + "DAGFlow '%s': context key '%s' overwritten by level output", + flow.name, + key, + ) + context.update(level_outputs) + + # -- Flow-level output validation ----------------------------------- + if flow.output_schema is not None: + try: + flow.output_schema.model_validate(context) + except ValidationError as exc: + wrapped = SchemaValidationError( + flow.name, len(flow.steps), str(exc), context="flow_output" + ) + _logger.error("DAGFlow '%s' output validation failed: %s", flow.name, wrapped) + return ExecutionResult( + flow_name=flow.name, + success=False, + final_output=None, + execution_log=[ + *log, + StepRecord( + step_index=len(flow.steps), + tool_name=flow.name, + inputs=dict(context), + error=wrapped, + success=False, + ), + ], + ) + + _logger.info("DAGFlow '%s' completed successfully", flow.name) + return ExecutionResult( + flow_name=flow.name, + success=True, + final_output=context, + execution_log=log, + ) diff --git a/chainweaver/flow.py b/chainweaver/flow.py index ca623c3..b082d5b 100644 --- a/chainweaver/flow.py +++ b/chainweaver/flow.py @@ -4,14 +4,22 @@ wire tool outputs into the next tool's inputs. Flows are registered in a :class:`~chainweaver.registry.FlowRegistry` and executed by a :class:`~chainweaver.executor.FlowExecutor`. + +:class:`DAGFlow` extends this with a directed-acyclic-graph model where each +:class:`DAGFlowStep` declares explicit ``depends_on`` edges. Topology +validation (cycle detection, duplicate IDs, unknown deps) is performed at +registration time via :func:`validate_dag_topology`. """ from __future__ import annotations -from typing import Any +from graphlib import CycleError, TopologicalSorter +from typing import Any, Literal from pydantic import BaseModel, ConfigDict, Field +from chainweaver.exceptions import DAGDefinitionError + class FlowStep(BaseModel): """A single step inside a :class:`Flow`. @@ -95,11 +103,159 @@ class Flow(BaseModel): input_schema: type[BaseModel] | None = None output_schema: type[BaseModel] | None = None - # TODO (Phase 2): Add support for DAG-based steps with explicit - # dependency edges and parallel execution groups. - # TODO (Phase 2): Add conditional branching — a step that inspects - # context values and selects the next step(s) at runtime. +# --------------------------------------------------------------------------- +# DAG model +# --------------------------------------------------------------------------- + + +class DAGFlowStep(FlowStep): + """A single step inside a :class:`DAGFlow`. + + Extends :class:`FlowStep` with an explicit identity and dependency + declaration so that the executor can build a dependency graph and execute + steps in topological order. + + Attributes: + step_id: Unique identifier for this step within the flow. Used to + reference the step in ``depends_on`` lists of other steps. + depends_on: List of ``step_id`` values that must complete before this + step can start. An empty list (the default) means the step has + no dependencies and runs in the first execution level. + step_type: Discriminator that indicates how the step is executed. + ``"tool"`` (the default) means a locally-registered + :class:`~chainweaver.tools.Tool` is invoked. The reserved value + ``"capability"`` is a forward-compat slot for kernel-delegated + capability invocations via the Weaver Stack agent-kernel contract + (see invariant I-07 in weaver-spec). Only ``"tool"`` is executed + today; ``"capability"`` will be dispatched by + ``KernelBackedExecutor`` in a future release. + capability_id: Weaver Stack capability identifier used when + ``step_type == "capability"``. Ignored (and should be ``None``) + for ``step_type == "tool"``. + + Example:: + + step = DAGFlowStep( + tool_name="fetch_data", + step_id="fetch", + depends_on=[], + ) + step_b = DAGFlowStep( + tool_name="transform", + step_id="transform", + depends_on=["fetch"], + ) + """ + + step_id: str + depends_on: list[str] = Field(default_factory=list) + step_type: Literal["tool", "capability"] = "tool" + capability_id: str | None = None + + +class DAGFlow(BaseModel): + """A deterministic, DAG-structured sequence of tool invocations. + + Steps are ordered by their ``depends_on`` declarations. Independent + steps (no unmet predecessors) form an execution *level* and run + sequentially within that level (parallel execution is a planned v0.4 + optimisation). + + Attributes: + name: Unique identifier for the flow. + description: Human-readable description of what the flow does. + steps: List of :class:`DAGFlowStep` objects. Order within the list + does not imply execution order — the executor derives order from + ``depends_on`` edges. + deterministic: When ``True`` (the default) the executor guarantees + that no LLM calls are inserted between steps. + trigger_conditions: Optional free-form metadata for agent-level + dispatch (not evaluated by ChainWeaver itself). + input_schema: Optional Pydantic :class:`~pydantic.BaseModel` subclass + validated against ``initial_input`` before the first step runs. + output_schema: Optional Pydantic :class:`~pydantic.BaseModel` subclass + validated against the final merged context after all steps finish. + + Raises: + DAGDefinitionError: If topology is invalid (cycle, duplicate + ``step_id``, or unknown ``depends_on`` reference). Raised at + model-validation time so callers learn about the error before any + execution attempt. + + Example:: + + dag = DAGFlow( + name="diamond", + description="A → (B, C) → D", + steps=[ + DAGFlowStep(tool_name="a", step_id="A", depends_on=[]), + DAGFlowStep(tool_name="b", step_id="B", depends_on=["A"]), + DAGFlowStep(tool_name="c", step_id="C", depends_on=["A"]), + DAGFlowStep(tool_name="d", step_id="D", depends_on=["B", "C"]), + ], + ) + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + name: str + description: str + steps: list[DAGFlowStep] + deterministic: bool = True + trigger_conditions: dict[str, Any] | None = None + input_schema: type[BaseModel] | None = None + output_schema: type[BaseModel] | None = None + + +# --------------------------------------------------------------------------- +# Topology validation helper +# --------------------------------------------------------------------------- + + +def validate_dag_topology(flow: DAGFlow) -> None: + """Validate the topology of a :class:`DAGFlow`. + + Checks (in order): + + 1. No duplicate ``step_id`` values. + 2. Every ``depends_on`` entry refers to a ``step_id`` that exists. + 3. No cycles (via :class:`graphlib.TopologicalSorter`). + + Args: + flow: The :class:`DAGFlow` to validate. + + Raises: + DAGDefinitionError: On any topology violation with a structured + ``reason`` attribute (``"duplicate_step_id"``, + ``"unknown_dependency"``, or ``"cycle"``). + """ + step_ids: set[str] = set() + for step in flow.steps: + if step.step_id in step_ids: + raise DAGDefinitionError( + flow.name, + "duplicate_step_id", + f"Step id '{step.step_id}' appears more than once.", + ) + step_ids.add(step.step_id) + + for step in flow.steps: + for dep in step.depends_on: + if dep not in step_ids: + raise DAGDefinitionError( + flow.name, + "unknown_dependency", + f"Step '{step.step_id}' depends on unknown step id '{dep}'.", + ) - # TODO (Phase 2): Add determinism scoring so that partially - # deterministic flows can be marked and handled appropriately. + graph: dict[str, set[str]] = {step.step_id: set(step.depends_on) for step in flow.steps} + sorter: TopologicalSorter[str] = TopologicalSorter(graph) + try: + sorter.prepare() + except CycleError as exc: + raise DAGDefinitionError( + flow.name, + "cycle", + f"Dependency cycle detected: {exc}.", + ) from exc diff --git a/chainweaver/registry.py b/chainweaver/registry.py index ce11a4a..454c796 100644 --- a/chainweaver/registry.py +++ b/chainweaver/registry.py @@ -1,26 +1,40 @@ """In-memory flow registry for ChainWeaver. The :class:`FlowRegistry` is the central catalogue of all registered -:class:`~chainweaver.flow.Flow` objects. Flows must be registered before -they can be executed by a :class:`~chainweaver.executor.FlowExecutor`. +:class:`~chainweaver.flow.Flow` and :class:`~chainweaver.flow.DAGFlow` +objects. Flows must be registered before they can be executed by a +:class:`~chainweaver.executor.FlowExecutor`. + +For :class:`~chainweaver.flow.DAGFlow` registrations, topology validation +(cycle detection, duplicate step IDs, unknown dependency references) is +performed at registration time via +:func:`~chainweaver.flow.validate_dag_topology`. """ from __future__ import annotations from chainweaver.exceptions import FlowAlreadyExistsError, FlowNotFoundError -from chainweaver.flow import Flow +from chainweaver.flow import DAGFlow, Flow, validate_dag_topology + +AnyFlow = Flow | DAGFlow class FlowRegistry: - """An in-memory registry of :class:`~chainweaver.flow.Flow` objects. + """An in-memory registry of :class:`~chainweaver.flow.Flow` and + :class:`~chainweaver.flow.DAGFlow` objects. Flows are stored by name. The registry is intentionally simple so that it can be wrapped, persisted, or replaced in later phases. + :class:`~chainweaver.flow.DAGFlow` instances are topology-validated at + registration time; a :class:`~chainweaver.exceptions.DAGDefinitionError` + is raised immediately if the graph is invalid. + Example:: registry = FlowRegistry() registry.register_flow(my_flow) + registry.register_flow(my_dag_flow) flow = registry.get_flow("my_flow") # TODO (Phase 2): Persist and reload flows from JSON/YAML storage. @@ -29,10 +43,16 @@ class FlowRegistry: """ def __init__(self) -> None: - self._flows: dict[str, Flow] = {} + self._flows: dict[str, AnyFlow] = {} + + def register_flow(self, flow: AnyFlow, *, overwrite: bool = False) -> None: + """Register a :class:`~chainweaver.flow.Flow` or + :class:`~chainweaver.flow.DAGFlow`. - def register_flow(self, flow: Flow, *, overwrite: bool = False) -> None: - """Register a :class:`~chainweaver.flow.Flow`. + For :class:`~chainweaver.flow.DAGFlow` instances, topology validation + is run before storing: duplicate ``step_id`` values, unknown + ``depends_on`` references, and dependency cycles all raise + :class:`~chainweaver.exceptions.DAGDefinitionError`. Args: flow: The flow to register. @@ -42,19 +62,24 @@ def register_flow(self, flow: Flow, *, overwrite: bool = False) -> None: Raises: FlowAlreadyExistsError: When a flow with the same name is already registered and *overwrite* is ``False``. + DAGDefinitionError: When *flow* is a :class:`~chainweaver.flow.DAGFlow` + with an invalid topology. """ if flow.name in self._flows and not overwrite: raise FlowAlreadyExistsError(flow.name) + if isinstance(flow, DAGFlow): + validate_dag_topology(flow) self._flows[flow.name] = flow - def get_flow(self, name: str) -> Flow: + def get_flow(self, name: str) -> AnyFlow: """Return the flow registered under *name*. Args: name: The name of the flow to retrieve. Returns: - The registered :class:`~chainweaver.flow.Flow`. + The registered :class:`~chainweaver.flow.Flow` or + :class:`~chainweaver.flow.DAGFlow`. Raises: FlowNotFoundError: When no flow with *name* is registered. @@ -72,7 +97,7 @@ def list_flows(self) -> list[str]: """ return list(self._flows.keys()) - def match_flow_by_intent(self, intent: str) -> Flow | None: + def match_flow_by_intent(self, intent: str) -> AnyFlow | None: """Return the first flow whose name or description contains *intent*. This is a very basic substring match intended as a placeholder for a @@ -82,7 +107,7 @@ def match_flow_by_intent(self, intent: str) -> Flow | None: intent: A short phrase or keyword describing the desired operation. Returns: - The first matching :class:`~chainweaver.flow.Flow`, or ``None``. + The first matching flow, or ``None``. # TODO (Phase 2): Replace with embedding-based semantic similarity so # agents can discover flows from natural-language descriptions. diff --git a/docs/agent-context/architecture.md b/docs/agent-context/architecture.md index ca08ced..ee67a02 100644 --- a/docs/agent-context/architecture.md +++ b/docs/agent-context/architecture.md @@ -23,9 +23,9 @@ and tools, the same flow produces the same output every time. |--------|---------------|----------------| | `decorators.py` | `@tool` decorator for zero-boilerplate tool definition | Returns a `Tool` subclass; introspects type hints | | `tools.py` | Define `Tool`: name + callable + Pydantic I/O schemas | Tool functions must be `fn(BaseModel) -> dict[str, Any]` | -| `flow.py` | Define `FlowStep` and `Flow` as Pydantic models | Pure data definitions; no execution logic | -| `registry.py` | Store and retrieve flows by name | In-memory; intentionally simple for later wrapping | -| `executor.py` | Run flows step-by-step, validate I/O, merge context | **No LLM, no network I/O, no randomness** | +| `flow.py` | Define `FlowStep`, `Flow` (linear), `DAGFlowStep`, `DAGFlow`, `validate_dag_topology` | Pure data definitions + topology validation; no execution logic | +| `registry.py` | Store and retrieve `Flow` and `DAGFlow` by name; validates DAG topology at registration | In-memory; intentionally simple for later wrapping | +| `executor.py` | Run flows step-by-step (linear) or level-by-level (DAG), validate I/O, merge context | **No LLM, no network I/O, no randomness** | | `exceptions.py` | Typed exception hierarchy | All inherit `ChainWeaverError`; carry context attrs | | `log_utils.py` | Per-step structured logging | Library-safe (NullHandler only); no handler config | | `__init__.py` | Public API surface | Every public symbol must be in `__all__` | @@ -36,11 +36,14 @@ and tools, the same flow produces the same output every time. | Decision | Rationale | |----------|-----------| -| Sequential-only execution | Phase 1 MVP. DAG execution is planned for v0.2. | +| Sequential-only execution for linear `Flow` | Phase 1 MVP. Unchanged. | +| DAG execution for `DAGFlow` | Phase 2: topological level grouping. Parallel execution of a level is a future v0.4 optimisation. | | Pydantic for all schemas | Deterministic I/O contracts between steps. | | No LLM calls in executor | "Compiled, not interpreted." | | `from __future__ import annotations` | Forward-reference support; cleaner type hints. | | `dataclass` for `StepRecord`/`ExecutionResult` | They carry `Exception` instances; Pydantic cannot serialize these. | +| `step_type` + `capability_id` on `DAGFlowStep` | Forward-compat slots for Weaver Stack kernel integration (weaver-spec I-07). Only `"tool"` is executed today; `"capability"` is reserved for `KernelBackedExecutor`. | +| Cycle detection at registration time | Fail fast — no silent deferral to execution. Belt-and-suspenders check also runs in the executor for flows created without registry. | --- diff --git a/tests/test_flow_execution.py b/tests/test_flow_execution.py index f3e4c26..d8c1b7b 100644 --- a/tests/test_flow_execution.py +++ b/tests/test_flow_execution.py @@ -19,7 +19,7 @@ ToolNotFoundError, ) from chainweaver.executor import FlowExecutor -from chainweaver.flow import Flow, FlowStep +from chainweaver.flow import DAGFlow, DAGFlowStep, Flow, FlowStep from chainweaver.registry import FlowRegistry from chainweaver.tools import Tool @@ -680,7 +680,7 @@ def divide_fn(inp: DivInput) -> dict: record = result.execution_log[0] assert record.success is False assert isinstance(record.error, FlowExecutionError) - assert "integer division or modulo by zero" in str(record.error) + assert "division by zero" in str(record.error) # --------------------------------------------------------------------------- @@ -707,7 +707,609 @@ def test_large_positive_input(self, executor: FlowExecutor) -> None: def test_large_negative_input(self, executor: FlowExecutor) -> None: result = executor.execute_flow("double_add_format", {"number": -1000}) - # double(-1000) \u2192 -2000, add_ten(-2000) \u2192 -1990 + # double(-1000) → -2000, add_ten(-2000) → -1990 assert result.success is True assert result.final_output is not None assert result.final_output["result"] == "Final value: -1990" + + +# --------------------------------------------------------------------------- +# DAG flow execution +# --------------------------------------------------------------------------- + +# Shared helpers for DAG tests + + +class _Doubled(BaseModel): + doubled: int + + +class _Tripled(BaseModel): + tripled: int + + +class _Sum(BaseModel): + total: int + + +class _Formatted(BaseModel): + label: str + + +def _make_dag_executor(*dag_steps_tools: tuple[str, type[BaseModel], type[BaseModel]]) -> None: + """Not used — helpers are defined inline per test for clarity.""" + + +def _build_dag_executor(flow: DAGFlow, *tools: Tool) -> FlowExecutor: + registry = FlowRegistry() + registry.register_flow(flow) + ex = FlowExecutor(registry=registry) + for t in tools: + ex.register_tool(t) + return ex + + +class TestSimpleDAG: + """Single-path DAG: A → B (equivalent to a two-step linear flow).""" + + def test_two_step_dag_succeeds(self) -> None: + class InpA(BaseModel): + x: int + + class OutA(BaseModel): + y: int + + class OutB(BaseModel): + z: int + + tool_a = Tool( + name="step_a", + description="Doubles x.", + input_schema=InpA, + output_schema=OutA, + fn=lambda inp: {"y": inp.x * 2}, + ) + tool_b = Tool( + name="step_b", + description="Adds 1 to y.", + input_schema=OutA, + output_schema=OutB, + fn=lambda inp: {"z": inp.y + 1}, + ) + flow = DAGFlow( + name="simple_dag", + description="A → B", + steps=[ + DAGFlowStep(tool_name="step_a", step_id="A", depends_on=[]), + DAGFlowStep( + tool_name="step_b", + step_id="B", + depends_on=["A"], + input_mapping={"y": "y"}, + ), + ], + ) + ex = _build_dag_executor(flow, tool_a, tool_b) + result = ex.execute_flow("simple_dag", {"x": 3}) + + assert result.success is True + assert result.final_output is not None + assert result.final_output["z"] == 7 # (3*2)+1 + assert len(result.execution_log) == 2 + assert [r.tool_name for r in result.execution_log] == ["step_a", "step_b"] + + +class TestSingleNodeDAG: + """A DAG with exactly one step and no edges.""" + + def test_single_node_dag_succeeds(self) -> None: + class Inp(BaseModel): + n: int + + class Out(BaseModel): + result: int + + tool = Tool( + name="lone", + description="Identity.", + input_schema=Inp, + output_schema=Out, + fn=lambda inp: {"result": inp.n}, + ) + flow = DAGFlow( + name="lone_dag", + description="Single node.", + steps=[DAGFlowStep(tool_name="lone", step_id="ONLY", depends_on=[])], + ) + ex = _build_dag_executor(flow, tool) + result = ex.execute_flow("lone_dag", {"n": 42}) + + assert result.success is True + assert result.final_output is not None + assert result.final_output["result"] == 42 + assert len(result.execution_log) == 1 + + +class TestDiamondDAG: + """Diamond pattern: A → (B, C) → D.""" + + def test_diamond_topology_all_steps_execute(self) -> None: + """A produces 'a_out'; B and C each read 'a_out' and produce distinct + keys; D reads both B and C outputs.""" + + class AInp(BaseModel): + seed: int + + class AOut(BaseModel): + a_out: int + + class BOut(BaseModel): + b_out: int + + class COut(BaseModel): + c_out: int + + class DInp(BaseModel): + b_out: int + c_out: int + + class DOut(BaseModel): + total: int + + tool_a = Tool( + name="t_a", + description="Seed * 2.", + input_schema=AInp, + output_schema=AOut, + fn=lambda inp: {"a_out": inp.seed * 2}, + ) + tool_b = Tool( + name="t_b", + description="a_out + 10.", + input_schema=AOut, + output_schema=BOut, + fn=lambda inp: {"b_out": inp.a_out + 10}, + ) + tool_c = Tool( + name="t_c", + description="a_out + 100.", + input_schema=AOut, + output_schema=COut, + fn=lambda inp: {"c_out": inp.a_out + 100}, + ) + tool_d = Tool( + name="t_d", + description="b_out + c_out.", + input_schema=DInp, + output_schema=DOut, + fn=lambda inp: {"total": inp.b_out + inp.c_out}, + ) + flow = DAGFlow( + name="diamond", + description="A → (B, C) → D", + steps=[ + DAGFlowStep( + tool_name="t_a", + step_id="A", + depends_on=[], + ), + DAGFlowStep( + tool_name="t_b", + step_id="B", + depends_on=["A"], + input_mapping={"a_out": "a_out"}, + ), + DAGFlowStep( + tool_name="t_c", + step_id="C", + depends_on=["A"], + input_mapping={"a_out": "a_out"}, + ), + DAGFlowStep( + tool_name="t_d", + step_id="D", + depends_on=["B", "C"], + input_mapping={"b_out": "b_out", "c_out": "c_out"}, + ), + ], + ) + ex = _build_dag_executor(flow, tool_a, tool_b, tool_c, tool_d) + result = ex.execute_flow("diamond", {"seed": 5}) + + # seed=5 → a_out=10, b_out=20, c_out=110, total=130 + assert result.success is True + assert result.final_output is not None + assert result.final_output["a_out"] == 10 + assert result.final_output["b_out"] == 20 + assert result.final_output["c_out"] == 110 + assert result.final_output["total"] == 130 + assert len(result.execution_log) == 4 + + def test_diamond_execution_log_order(self) -> None: + """Within each level steps execute in definition order.""" + + class Val(BaseModel): + v: int + + class AO(BaseModel): + av: int + + class BO(BaseModel): + bv: int + + class CO(BaseModel): + cv: int + + class DO(BaseModel): + dv: int + + ta = Tool( + name="ta", + description="a", + input_schema=Val, + output_schema=AO, + fn=lambda i: {"av": i.v}, + ) + tb = Tool( + name="tb", + description="b", + input_schema=AO, + output_schema=BO, + fn=lambda i: {"bv": i.av + 1}, + ) + tc = Tool( + name="tc", + description="c", + input_schema=AO, + output_schema=CO, + fn=lambda i: {"cv": i.av + 2}, + ) + + class TDInp(BaseModel): + bv: int + cv: int + + td = Tool( + name="td", + description="d", + input_schema=TDInp, + output_schema=DO, + fn=lambda i: {"dv": i.bv + i.cv}, + ) + + flow = DAGFlow( + name="diamond_order", + description="Order test.", + steps=[ + DAGFlowStep(tool_name="ta", step_id="A", depends_on=[]), + DAGFlowStep( + tool_name="tb", + step_id="B", + depends_on=["A"], + input_mapping={"av": "av"}, + ), + DAGFlowStep( + tool_name="tc", + step_id="C", + depends_on=["A"], + input_mapping={"av": "av"}, + ), + DAGFlowStep( + tool_name="td", + step_id="D", + depends_on=["B", "C"], + input_mapping={"bv": "bv", "cv": "cv"}, + ), + ], + ) + ex = _build_dag_executor(flow, ta, tb, tc, td) + result = ex.execute_flow("diamond_order", {"v": 0}) + + assert result.success is True + names = [r.tool_name for r in result.execution_log] + # A must come first; D must come last. + assert names[0] == "ta" + assert names[-1] == "td" + assert set(names[1:3]) == {"tb", "tc"} + + +class TestMixedDepthDAG: + """Non-uniform depth: A → B → D, A → C → D (mixed depth chains).""" + + def test_mixed_depth_all_steps_complete(self) -> None: + class S(BaseModel): + v: int + + class BOut(BaseModel): + b_v: int + + class COut(BaseModel): + c_v: int + + class DOOut(BaseModel): + final: int + + class DIInp(BaseModel): + b_v: int + c_v: int + + ta = Tool( + name="ma", + description="a", + input_schema=S, + output_schema=S, + fn=lambda i: {"v": i.v + 1}, + ) + tb = Tool( + name="mb", + description="b", + input_schema=S, + output_schema=BOut, + fn=lambda i: {"b_v": i.v + 10}, + ) + tc = Tool( + name="mc", + description="c", + input_schema=S, + output_schema=COut, + fn=lambda i: {"c_v": i.v + 100}, + ) + td = Tool( + name="md", + description="d", + input_schema=DIInp, + output_schema=DOOut, + fn=lambda i: {"final": i.b_v + i.c_v}, + ) + + flow = DAGFlow( + name="mixed_depth", + description="A → B, A → C, (B,C) → D with renamed keys", + steps=[ + DAGFlowStep(tool_name="ma", step_id="A", depends_on=[]), + DAGFlowStep( + tool_name="mb", + step_id="B", + depends_on=["A"], + input_mapping={"v": "v"}, + ), + DAGFlowStep( + tool_name="mc", + step_id="C", + depends_on=["A"], + input_mapping={"v": "v"}, + ), + DAGFlowStep( + tool_name="md", + step_id="D", + depends_on=["B", "C"], + input_mapping={"b_v": "b_v", "c_v": "c_v"}, + ), + ], + ) + + ex = _build_dag_executor(flow, ta, tb, tc, td) + result = ex.execute_flow("mixed_depth", {"v": 0}) + + # A: v=1, B: b_v=11, C: c_v=101, D: final=112 + assert result.success is True + assert result.final_output is not None + assert result.final_output["final"] == 112 + assert len(result.execution_log) == 4 + + +class TestDAGSiblingKeyConflict: + """Two sibling steps producing the same output key → deterministic error.""" + + def test_sibling_conflict_fails_gracefully(self) -> None: + class Inp(BaseModel): + x: int + + class Out(BaseModel): + v: int # BOTH siblings write "v" → conflict + + ta = Tool( + name="sa", + description="a", + input_schema=Inp, + output_schema=Out, + fn=lambda i: {"v": i.x}, + ) + tb = Tool( + name="sb", + description="b", + input_schema=Inp, + output_schema=Out, + fn=lambda i: {"v": i.x * 2}, + ) + + flow = DAGFlow( + name="conflict_dag", + description="Two independent steps writing the same key.", + steps=[ + DAGFlowStep(tool_name="sa", step_id="A", depends_on=[]), + DAGFlowStep( + tool_name="sb", + step_id="B", + depends_on=[], + input_mapping={"x": "x"}, + ), + ], + ) + ex = _build_dag_executor(flow, ta, tb) + result = ex.execute_flow("conflict_dag", {"x": 5}) + + assert result.success is False + assert any(isinstance(r.error, FlowExecutionError) for r in result.execution_log) + + def test_non_conflicting_siblings_succeed(self) -> None: + class Inp(BaseModel): + x: int + + class OutA(BaseModel): + left: int + + class OutB(BaseModel): + right: int + + ta = Tool( + name="nca", + description="a", + input_schema=Inp, + output_schema=OutA, + fn=lambda i: {"left": i.x}, + ) + tb = Tool( + name="ncb", + description="b", + input_schema=Inp, + output_schema=OutB, + fn=lambda i: {"right": i.x * 2}, + ) + + flow = DAGFlow( + name="no_conflict_dag", + description="Two independent steps with distinct keys.", + steps=[ + DAGFlowStep(tool_name="nca", step_id="A", depends_on=[]), + DAGFlowStep( + tool_name="ncb", + step_id="B", + depends_on=[], + input_mapping={"x": "x"}, + ), + ], + ) + ex = _build_dag_executor(flow, ta, tb) + result = ex.execute_flow("no_conflict_dag", {"x": 3}) + + assert result.success is True + assert result.final_output is not None + assert result.final_output["left"] == 3 + assert result.final_output["right"] == 6 + + +class TestDAGFlowLevelSchemas: + """Optional input_schema / output_schema on DAGFlow.""" + + def test_valid_input_schema_passes(self) -> None: + class Inp(BaseModel): + n: int + + class Out(BaseModel): + doubled: int + + ta = Tool( + name="ds", + description="d", + input_schema=Inp, + output_schema=Out, + fn=lambda i: {"doubled": i.n * 2}, + ) + + class OutSchema(BaseModel): + doubled: int + + flow = DAGFlow( + name="schema_dag", + description="With schemas.", + steps=[ + DAGFlowStep( + tool_name="ds", + step_id="A", + depends_on=[], + input_mapping={"n": "n"}, + ) + ], + input_schema=Inp, + output_schema=OutSchema, + ) + ex = _build_dag_executor(flow, ta) + result = ex.execute_flow("schema_dag", {"n": 4}) + + assert result.success is True + assert result.final_output is not None + assert result.final_output["doubled"] == 8 + + def test_invalid_input_schema_caught_before_execution(self) -> None: + class Inp(BaseModel): + n: int + + class Out(BaseModel): + doubled: int + + ta = Tool( + name="ds2", + description="d2", + input_schema=Inp, + output_schema=Out, + fn=lambda i: {"doubled": i.n * 2}, + ) + flow = DAGFlow( + name="guard_dag", + description="Guards input.", + steps=[DAGFlowStep(tool_name="ds2", step_id="A", depends_on=[])], + input_schema=Inp, + ) + ex = _build_dag_executor(flow, ta) + result = ex.execute_flow("guard_dag", {"wrong": "value"}) + + assert result.success is False + assert len(result.execution_log) == 1 + assert result.execution_log[0].step_index == -1 + assert isinstance(result.execution_log[0].error, SchemaValidationError) + + +class TestDAGLinearBackwardCompat: + """Existing linear Flow must be completely unaffected by DAG changes.""" + + def test_linear_flow_still_works(self, executor: FlowExecutor) -> None: + result = executor.execute_flow("double_add_format", {"number": 5}) + assert result.success is True + assert result.final_output is not None + assert result.final_output["result"] == "Final value: 20" + + def test_linear_flow_error_path_still_works(self, executor: FlowExecutor) -> None: + result = executor.execute_flow("double_add_format", {"number": "bad"}) + assert result.success is False + assert isinstance(result.execution_log[0].error, SchemaValidationError) + + +class TestDAGMissingTool: + """A DAGFlow step that references an unregistered tool fails gracefully.""" + + def test_missing_tool_fails_step(self) -> None: + flow = DAGFlow( + name="missing_tool_dag", + description="Step references an unregistered tool.", + steps=[DAGFlowStep(tool_name="ghost", step_id="G", depends_on=[])], + ) + registry = FlowRegistry() + registry.register_flow(flow) + ex = FlowExecutor(registry=registry) + # No tools registered. + result = ex.execute_flow("missing_tool_dag", {}) + + assert result.success is False + assert len(result.execution_log) == 1 + assert isinstance(result.execution_log[0].error, ToolNotFoundError) + + +class TestDAGStepType: + """step_type field is present and defaults to 'tool'; capability_id is None.""" + + def test_step_type_default(self) -> None: + step = DAGFlowStep(tool_name="t", step_id="s", depends_on=[]) + assert step.step_type == "tool" + assert step.capability_id is None + + def test_step_type_capability_accepted(self) -> None: + step = DAGFlowStep( + tool_name="t", + step_id="s", + depends_on=[], + step_type="capability", + capability_id="org.example.my_cap", + ) + assert step.step_type == "capability" + assert step.capability_id == "org.example.my_cap" diff --git a/tests/test_registry.py b/tests/test_registry.py index a8d36ac..fbf0496 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -4,8 +4,8 @@ import pytest -from chainweaver.exceptions import FlowAlreadyExistsError, FlowNotFoundError -from chainweaver.flow import Flow, FlowStep +from chainweaver.exceptions import DAGDefinitionError, FlowAlreadyExistsError, FlowNotFoundError +from chainweaver.flow import DAGFlow, DAGFlowStep, Flow, FlowStep from chainweaver.registry import FlowRegistry # --------------------------------------------------------------------------- @@ -147,3 +147,104 @@ def test_register_flow_then_overwrite_preserves_count(self) -> None: registry.register_flow(new_flow, overwrite=True) assert len(registry) == 2 assert registry.get_flow("replace_me").description == "Replaced" + + +# --------------------------------------------------------------------------- +# DAGFlow registration +# --------------------------------------------------------------------------- + + +def _make_dag(name: str = "dag", *, steps: list[DAGFlowStep] | None = None) -> DAGFlow: + if steps is None: + steps = [ + DAGFlowStep(tool_name="a", step_id="A", depends_on=[]), + DAGFlowStep(tool_name="b", step_id="B", depends_on=["A"]), + ] + return DAGFlow(name=name, description=f"Test DAG '{name}'.", steps=steps) + + +class TestDAGFlowRegistration: + def test_valid_dag_registers(self) -> None: + registry = FlowRegistry() + registry.register_flow(_make_dag("valid")) + assert "valid" in registry.list_flows() + + def test_dag_and_linear_coexist(self) -> None: + registry = FlowRegistry() + registry.register_flow(_make_flow("linear")) + registry.register_flow(_make_dag("dag")) + assert set(registry.list_flows()) == {"linear", "dag"} + + def test_dag_duplicate_step_id_raises(self) -> None: + steps = [ + DAGFlowStep(tool_name="a", step_id="DUP", depends_on=[]), + DAGFlowStep(tool_name="b", step_id="DUP", depends_on=["DUP"]), + ] + flow = DAGFlow(name="dup_dag", description="Duplicate IDs.", steps=steps) + registry = FlowRegistry() + with pytest.raises(DAGDefinitionError) as exc_info: + registry.register_flow(flow) + assert exc_info.value.reason == "duplicate_step_id" + assert exc_info.value.flow_name == "dup_dag" + + def test_dag_unknown_dependency_raises(self) -> None: + steps = [ + DAGFlowStep(tool_name="a", step_id="A", depends_on=["GHOST"]), + ] + flow = DAGFlow(name="unknown_dag", description="Unknown dep.", steps=steps) + registry = FlowRegistry() + with pytest.raises(DAGDefinitionError) as exc_info: + registry.register_flow(flow) + assert exc_info.value.reason == "unknown_dependency" + + def test_dag_cycle_raises(self) -> None: + steps = [ + DAGFlowStep(tool_name="a", step_id="A", depends_on=["B"]), + DAGFlowStep(tool_name="b", step_id="B", depends_on=["A"]), + ] + flow = DAGFlow(name="cycle_dag", description="Cycle A↔B.", steps=steps) + registry = FlowRegistry() + with pytest.raises(DAGDefinitionError) as exc_info: + registry.register_flow(flow) + assert exc_info.value.reason == "cycle" + + def test_dag_self_loop_raises(self) -> None: + steps = [DAGFlowStep(tool_name="a", step_id="A", depends_on=["A"])] + flow = DAGFlow(name="self_loop", description="Self-dep.", steps=steps) + registry = FlowRegistry() + with pytest.raises(DAGDefinitionError) as exc_info: + registry.register_flow(flow) + assert exc_info.value.reason in ("unknown_dependency", "cycle") + + def test_dag_overwrite_reruns_validation(self) -> None: + registry = FlowRegistry() + registry.register_flow(_make_dag("dag_v1")) + + bad_steps = [ + DAGFlowStep(tool_name="x", step_id="X", depends_on=["Y"]), + DAGFlowStep(tool_name="y", step_id="Y", depends_on=["X"]), + ] + bad_dag = DAGFlow(name="dag_v1", description="Cycle.", steps=bad_steps) + with pytest.raises(DAGDefinitionError): + registry.register_flow(bad_dag, overwrite=True) + # Original should still be present (overwrite never committed). + assert "dag_v1" in registry.list_flows() + + def test_dag_get_flow_returns_dag_instance(self) -> None: + registry = FlowRegistry() + dag = _make_dag("typed") + registry.register_flow(dag) + retrieved = registry.get_flow("typed") + assert isinstance(retrieved, DAGFlow) + + def test_dag_match_by_intent(self) -> None: + registry = FlowRegistry() + dag = DAGFlow( + name="process_events", + description="Processes incoming event streams in parallel.", + steps=[DAGFlowStep(tool_name="t", step_id="T", depends_on=[])], + ) + registry.register_flow(dag) + match = registry.match_flow_by_intent("event streams") + assert match is not None + assert match.name == "process_events" From aeb528c36030ffb0741279bb7e93b98577594d92 Mon Sep 17 00:00:00 2001 From: Diogo Andre Santos Date: Tue, 14 Apr 2026 06:38:53 +0100 Subject: [PATCH 2/3] fix: address 6 review comments on DAG execution PR - Fix _compute_dag_levels() to use TopologicalSorter order, preventing KeyError when steps are listed in non-dependency order in flow.steps - Add step_type guard in _execute_dag_flow() that rejects non-tool steps with a structured FlowExecutionError (capability reserved for KernelBackedExecutor) - Add Pydantic model_validator on DAGFlowStep: step_type='tool' must have capability_id=None (capability steps left unconstrained for future spec) - Fix DAGFlow docstring: DAGDefinitionError is raised when validate_dag_topology() is invoked, not at model-validation time - Fix _compute_dag_levels() docstring: acknowledge belt-and-suspenders validate_dag_topology() call can raise DAGDefinitionError - Reconcile parallel execution version reference: v0.4 -> v0.2 in architecture.md and flow.py DAGFlow docstring (aligned with README) - Add tests: reverse-ordered steps, capability step rejection, tool-step-with-capability_id rejection, capability-without-id acceptance --- chainweaver/executor.py | 55 ++++++++++++++--- chainweaver/flow.py | 23 +++++-- docs/agent-context/architecture.md | 2 +- tests/test_flow_execution.py | 97 ++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+), 16 deletions(-) diff --git a/chainweaver/executor.py b/chainweaver/executor.py index d1b2485..54b93e8 100644 --- a/chainweaver/executor.py +++ b/chainweaver/executor.py @@ -8,6 +8,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from graphlib import TopologicalSorter from typing import Any from pydantic import ValidationError @@ -377,11 +378,15 @@ def _compute_dag_levels(self, flow: DAGFlow) -> list[list[DAGFlowStep]]: Steps in the same level can conceptually run in parallel; today they run sequentially in list order. - The topology is already validated at registration time, so this - method cannot raise :class:`~chainweaver.exceptions.DAGDefinitionError`. - The ``validate_dag_topology`` call here is a belt-and-suspenders guard + Topology is normally validated at registration time. This method + still calls ``validate_dag_topology`` as a belt-and-suspenders guard for flows that are created and executed without going through - :class:`~chainweaver.registry.FlowRegistry`. + :class:`~chainweaver.registry.FlowRegistry`, so invalid DAGs may + raise :class:`~chainweaver.exceptions.DAGDefinitionError` here. + + Level computation uses :class:`graphlib.TopologicalSorter` to iterate + steps in dependency order, so the result is correct regardless of the + declaration order of steps in ``flow.steps``. Args: flow: A valid :class:`~chainweaver.flow.DAGFlow`. @@ -392,18 +397,23 @@ def _compute_dag_levels(self, flow: DAGFlow) -> list[list[DAGFlowStep]]: """ validate_dag_topology(flow) step_by_id = {s.step_id: s for s in flow.steps} + graph: dict[str, set[str]] = {s.step_id: set(s.depends_on) for s in flow.steps} + sorter: TopologicalSorter[str] = TopologicalSorter(graph) + topo_order = list(sorter.static_order()) + # level[step_id] = 0-based level index levels: dict[str, int] = {} - for step in flow.steps: + for step_id in topo_order: + step = step_by_id[step_id] if not step.depends_on: - levels[step.step_id] = 0 + levels[step_id] = 0 else: - levels[step.step_id] = max(levels[dep] for dep in step.depends_on) + 1 + levels[step_id] = max(levels[dep] for dep in step.depends_on) + 1 max_level = max(levels.values(), default=-1) grouped: list[list[DAGFlowStep]] = [[] for _ in range(max_level + 1)] - for step in flow.steps: - grouped[levels[step.step_id]].append(step_by_id[step.step_id]) + for step_id in topo_order: + grouped[levels[step_id]].append(step_by_id[step_id]) return grouped def _execute_dag_flow( @@ -462,6 +472,33 @@ def _execute_dag_flow( level_records: list[StepRecord] = [] for step in level_steps: + # Reject non-tool step types until KernelBackedExecutor exists. + if step.step_type != "tool": + err = FlowExecutionError( + step.tool_name, + flat_index, + f"Step '{step.step_id}' has step_type='{step.step_type}' " + f"which is not supported by FlowExecutor. " + f"Only step_type='tool' can be executed.", + ) + log_step_error(_logger, flat_index, step.tool_name, err) + log.extend(level_records) + log.append( + StepRecord( + step_index=flat_index, + tool_name=step.tool_name, + inputs={}, + error=err, + success=False, + ) + ) + return ExecutionResult( + flow_name=flow.name, + success=False, + final_output=None, + execution_log=log, + ) + # Build a lightweight FlowStep-compatible view so _execute_step # can be reused without modification. proxy = FlowStep( diff --git a/chainweaver/flow.py b/chainweaver/flow.py index b082d5b..4f23c96 100644 --- a/chainweaver/flow.py +++ b/chainweaver/flow.py @@ -16,7 +16,7 @@ from graphlib import CycleError, TopologicalSorter from typing import Any, Literal -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, model_validator from chainweaver.exceptions import DAGDefinitionError @@ -153,14 +153,25 @@ class DAGFlowStep(FlowStep): step_type: Literal["tool", "capability"] = "tool" capability_id: str | None = None + @model_validator(mode="after") + def _check_tool_has_no_capability_id(self) -> DAGFlowStep: + """Ensure ``capability_id`` is ``None`` when ``step_type`` is ``'tool'``.""" + if self.step_type == "tool" and self.capability_id is not None: + msg = ( + f"Step '{self.step_id}' has step_type='tool' but capability_id=" + f"'{self.capability_id}'. capability_id must be None for tool steps." + ) + raise ValueError(msg) + return self + class DAGFlow(BaseModel): """A deterministic, DAG-structured sequence of tool invocations. Steps are ordered by their ``depends_on`` declarations. Independent steps (no unmet predecessors) form an execution *level* and run - sequentially within that level (parallel execution is a planned v0.4 - optimisation). + sequentially within that level (parallel/async execution for independent + levels is planned for v0.2). Attributes: name: Unique identifier for the flow. @@ -179,9 +190,9 @@ class DAGFlow(BaseModel): Raises: DAGDefinitionError: If topology is invalid (cycle, duplicate - ``step_id``, or unknown ``depends_on`` reference). Raised at - model-validation time so callers learn about the error before any - execution attempt. + ``step_id``, or unknown ``depends_on`` reference) when + :func:`validate_dag_topology` is invoked, such as during flow + registration or before execution. Example:: diff --git a/docs/agent-context/architecture.md b/docs/agent-context/architecture.md index ee67a02..740ccfc 100644 --- a/docs/agent-context/architecture.md +++ b/docs/agent-context/architecture.md @@ -37,7 +37,7 @@ and tools, the same flow produces the same output every time. | Decision | Rationale | |----------|-----------| | Sequential-only execution for linear `Flow` | Phase 1 MVP. Unchanged. | -| DAG execution for `DAGFlow` | Phase 2: topological level grouping. Parallel execution of a level is a future v0.4 optimisation. | +| DAG execution for `DAGFlow` | Phase 2: topological level grouping. Parallel/async execution for independent levels is planned for v0.2. | | Pydantic for all schemas | Deterministic I/O contracts between steps. | | No LLM calls in executor | "Compiled, not interpreted." | | `from __future__ import annotations` | Forward-reference support; cleaner type hints. | diff --git a/tests/test_flow_execution.py b/tests/test_flow_execution.py index d8c1b7b..5fca289 100644 --- a/tests/test_flow_execution.py +++ b/tests/test_flow_execution.py @@ -1313,3 +1313,100 @@ def test_step_type_capability_accepted(self) -> None: ) assert step.step_type == "capability" assert step.capability_id == "org.example.my_cap" + + def test_tool_step_with_capability_id_rejected(self) -> None: + with pytest.raises(ValidationError, match="capability_id must be None"): + DAGFlowStep( + tool_name="t", + step_id="s", + depends_on=[], + step_type="tool", + capability_id="org.example.invalid", + ) + + def test_capability_step_without_capability_id_accepted(self) -> None: + step = DAGFlowStep( + tool_name="t", + step_id="s", + depends_on=[], + step_type="capability", + ) + assert step.step_type == "capability" + assert step.capability_id is None + + +class TestDAGReverseOrderedSteps: + """Steps listed in reverse dependency order must still execute correctly.""" + + def test_reverse_ordered_steps_succeed(self) -> None: + class Inp(BaseModel): + x: int + + class Mid(BaseModel): + y: int + + class Out(BaseModel): + z: int + + tool_a = Tool( + name="ta", + description="Doubles x.", + input_schema=Inp, + output_schema=Mid, + fn=lambda inp: {"y": inp.x * 2}, + ) + tool_b = Tool( + name="tb", + description="Adds 1 to y.", + input_schema=Mid, + output_schema=Out, + fn=lambda inp: {"z": inp.y + 1}, + ) + # B depends on A, but B is listed FIRST in steps. + flow = DAGFlow( + name="reverse_order", + description="B before A in list, A before B in deps.", + steps=[ + DAGFlowStep( + tool_name="tb", + step_id="B", + depends_on=["A"], + input_mapping={"y": "y"}, + ), + DAGFlowStep(tool_name="ta", step_id="A", depends_on=[]), + ], + ) + ex = _build_dag_executor(flow, tool_a, tool_b) + result = ex.execute_flow("reverse_order", {"x": 5}) + + assert result.success is True + assert result.final_output is not None + assert result.final_output["z"] == 11 # (5*2)+1 + + +class TestDAGCapabilityStepExecution: + """FlowExecutor must reject step_type='capability' with a clear error.""" + + def test_capability_step_rejected_at_execution(self) -> None: + flow = DAGFlow( + name="cap_dag", + description="One capability step.", + steps=[ + DAGFlowStep( + tool_name="t", + step_id="C", + depends_on=[], + step_type="capability", + capability_id="org.example.cap", + ), + ], + ) + registry = FlowRegistry() + registry.register_flow(flow) + ex = FlowExecutor(registry=registry) + + result = ex.execute_flow("cap_dag", {}) + assert result.success is False + assert len(result.execution_log) == 1 + assert isinstance(result.execution_log[0].error, FlowExecutionError) + assert "not supported by FlowExecutor" in str(result.execution_log[0].error) From 0386baa43f6f12042e986a74f9b4929c7a5aaee1 Mon Sep 17 00:00:00 2001 From: Diogo Andre Santos Date: Tue, 14 Apr 2026 06:51:44 +0100 Subject: [PATCH 3/3] fix: address second round of review comments on DAG PR - Export validate_dag_topology in __init__.py __all__ (invariant 5) - Update __init__.py docstring to include DAG types in API example - Update executor TODO: replace completed DAG item with parallel/async level execution which is still pending - Restore two removed flow.py TODOs for conditional branching and determinism scoring (still unimplemented) - Add DAG output schema failure test (TestDAGFlowLevelSchemas) - Remove dead code: unused _Doubled/_Tripled/_Sum/_Formatted classes and unused _make_dag_executor stub in tests --- chainweaver/__init__.py | 9 +++++-- chainweaver/executor.py | 4 +-- chainweaver/flow.py | 7 +++++ tests/test_flow_execution.py | 50 +++++++++++++++++++++--------------- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/chainweaver/__init__.py b/chainweaver/__init__.py index ac27131..a395aa1 100644 --- a/chainweaver/__init__.py +++ b/chainweaver/__init__.py @@ -5,9 +5,13 @@ .. code-block:: python - from chainweaver import Tool, Flow, FlowStep, FlowRegistry, FlowExecutor + from chainweaver import ( + Tool, Flow, FlowStep, DAGFlow, DAGFlowStep, + FlowRegistry, FlowExecutor, validate_dag_topology, + ) from chainweaver.exceptions import ( ChainWeaverError, + DAGDefinitionError, ToolNotFoundError, FlowNotFoundError, FlowAlreadyExistsError, @@ -35,7 +39,7 @@ ToolNotFoundError, ) from chainweaver.executor import ExecutionResult, FlowExecutor, StepRecord -from chainweaver.flow import DAGFlow, DAGFlowStep, Flow, FlowStep +from chainweaver.flow import DAGFlow, DAGFlowStep, Flow, FlowStep, validate_dag_topology from chainweaver.registry import FlowRegistry from chainweaver.tools import Tool @@ -65,4 +69,5 @@ "ToolDefinitionError", "ToolNotFoundError", "tool", + "validate_dag_topology", ] diff --git a/chainweaver/executor.py b/chainweaver/executor.py index 54b93e8..3aa3055 100644 --- a/chainweaver/executor.py +++ b/chainweaver/executor.py @@ -108,8 +108,8 @@ class FlowExecutor: print(result.final_output) # {"result": "Final value: 20"} # TODO (Phase 2): Add async execution mode for I/O-bound tool chains. - # TODO (Phase 2): Support DAG execution with dependency resolution and - # parallel step groups. + # TODO (Phase 2): Support parallel/async execution for independent DAG + # levels (currently steps within a level run sequentially). # TODO (Phase 2): Add middleware hooks (before_step / after_step) for # observability and tracing integrations. """ diff --git a/chainweaver/flow.py b/chainweaver/flow.py index 4f23c96..5f79a18 100644 --- a/chainweaver/flow.py +++ b/chainweaver/flow.py @@ -104,6 +104,13 @@ class Flow(BaseModel): output_schema: type[BaseModel] | None = None +# TODO (Phase 2): Add conditional branching — a step that inspects +# context values and selects the next step(s) at runtime. + +# TODO (Phase 2): Add determinism scoring so that partially +# deterministic flows can be marked and handled appropriately. + + # --------------------------------------------------------------------------- # DAG model # --------------------------------------------------------------------------- diff --git a/tests/test_flow_execution.py b/tests/test_flow_execution.py index 5fca289..4773584 100644 --- a/tests/test_flow_execution.py +++ b/tests/test_flow_execution.py @@ -720,26 +720,6 @@ def test_large_negative_input(self, executor: FlowExecutor) -> None: # Shared helpers for DAG tests -class _Doubled(BaseModel): - doubled: int - - -class _Tripled(BaseModel): - tripled: int - - -class _Sum(BaseModel): - total: int - - -class _Formatted(BaseModel): - label: str - - -def _make_dag_executor(*dag_steps_tools: tuple[str, type[BaseModel], type[BaseModel]]) -> None: - """Not used — helpers are defined inline per test for clarity.""" - - def _build_dag_executor(flow: DAGFlow, *tools: Tool) -> FlowExecutor: registry = FlowRegistry() registry.register_flow(flow) @@ -1259,6 +1239,36 @@ class Out(BaseModel): assert result.execution_log[0].step_index == -1 assert isinstance(result.execution_log[0].error, SchemaValidationError) + def test_invalid_output_schema_caught_after_execution(self) -> None: + class Inp(BaseModel): + n: int + + class Out(BaseModel): + doubled: int + + class WrongOutputSchema(BaseModel): + missing_field: str # context won't have this key + + ta = Tool( + name="ds3", + description="d3", + input_schema=Inp, + output_schema=Out, + fn=lambda i: {"doubled": i.n * 2}, + ) + flow = DAGFlow( + name="bad_out_dag", + description="Output schema mismatch.", + steps=[DAGFlowStep(tool_name="ds3", step_id="A", depends_on=[])], + input_schema=Inp, + output_schema=WrongOutputSchema, + ) + ex = _build_dag_executor(flow, ta) + result = ex.execute_flow("bad_out_dag", {"n": 4}) + + assert result.success is False + assert any(isinstance(r.error, SchemaValidationError) for r in result.execution_log) + class TestDAGLinearBackwardCompat: """Existing linear Flow must be completely unaffected by DAG changes."""