From afa45c60198db27d783ccd8ba9a0089bc064982c Mon Sep 17 00:00:00 2001 From: Benjamin Date: Tue, 12 May 2026 09:26:54 +0800 Subject: [PATCH 1/5] feat(dsl,nodes): support agent_strategy plugin invocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a first-class workflow node for Dify ``agent_strategy`` plugins plus the slim-backed runtime adapter that drives them. This unlocks Dify Studio chatflows that contain ``type: agent`` nodes — ``graphon.dsl.loads`` now accepts and executes them end-to-end. Three layers, all shipped together so the change is atomic: 1. **Slim layer** (``src/graphon/dsl/slim/agent.py``): - ``SlimAgentStrategyClient`` — thin invoker scoped by ``(plugin_id, agent_strategy_provider, agent_strategy)``. Mirrors ``SlimLLM`` and ``SlimToolNodeRuntime``: ``action_invoker`` injection for tests, ``SlimClient`` for subprocess. Errors from ``action_invoker`` propagate unchanged (caller-owned); errors from the SlimClient path are wrapped into ``SlimAgentStrategyError`` (library-owned boundary). - ``AgentRuntimeMessage`` — PEP 695 alias of ``ToolRuntimeMessage`` because the Dify Plugin SDK defines ``AgentInvokeMessage(InvokeMessage): pass`` without adding fields. The alias keeps call sites semantically clear and reserves room for divergence later. - ``SlimActionInvoker`` — DI seam. - ``_decode_agent_message`` — dedicated decoder for the agent message subset (text / link / json / log / variable / retriever_resources). Independent from the tool runtime's decoder so the agent path is not coupled to tool-specific variants (file / blob / image). 2. **Node layer** (``src/graphon/nodes/agent/``): - ``AgentNodeData`` mirrors the Dify Studio v1.7+ export shape — the ``agent_strategy_provider_name`` / ``agent_strategy_name`` / ``plugin_unique_identifier`` triple plus the ``agent_parameters`` typed-wrapper bag. - ``AgentParameterValue`` is the ``{type, value}`` wrapper Studio emits for every parameter (constant / variable / mixed), type-checked at validation time. - ``AgentNode._run`` is a streaming generator that resolves typed-wrapper parameters against the variable pool, forwards them through the injected runtime, translates each runtime message into the matching graph event — text/link → ``StreamChunkEvent`` selected by ``[node_id, "text"]``, log → ``AgentLogEvent``, json / variable → accumulated outputs — and emits one terminal ``StreamCompletedEvent``. - ``AgentNodeError`` is the node-layer boundary error type. - ``AgentNodeRuntimeProtocol`` (in ``nodes/runtime.py``) decouples the node from any specific runtime. 3. **DSL wiring** (``src/graphon/dsl/``): - ``SlimAgentNodeRuntime`` (``agent_runtime.py``) implements ``AgentNodeRuntimeProtocol`` by assembling a fresh ``SlimAgentStrategyClient`` per invocation. Stateless adapter that mirrors how ``SlimDslNodeFactory`` constructs LLM and tool slim clients per node. ``_provider_slug`` extracts the trailing segment of ``agent_strategy_provider_name`` (e.g. "langgenius/agent/agent" → "agent") to match the slim payload contract. - ``SlimDslNodeFactory.create_node`` routes ``BuiltinNodeTypes.AGENT`` to ``AgentNode`` with a slim-backed runtime built from the factory's existing ``slim_client_config``. - ``_SUPPORTED_DEFAULT_FACTORY_NODES`` in the importer now includes ``BuiltinNodeTypes.AGENT``. Tests: 28 new cases across three files cover the slim adapter (``tests/dsl/test_slim_agent.py``: 12 cases — basic / mixed-type decoding, payload contract, meta propagation, lazy partial consumption, both error-path contracts, alias identity), the node behavior (``tests/nodes/agent/test_agent_node.py``: 10 cases — text streaming, log → ``AgentLogEvent``, json / variable accumulation, all three typed-wrapper resolution modes, runtime-error wrap), the slim runtime adapter (``tests/dsl/test_slim_agent_node_runtime.py``: 4 cases — slug extraction, payload forwarding, decoded message contract, per-node client identity), and factory routing (``tests/dsl/test_node_factory_agent.py``: 2 cases). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/graphon/dsl/_provider.py | 8 + src/graphon/dsl/agent_runtime.py | 59 ++++ src/graphon/dsl/importer.py | 1 + src/graphon/dsl/node_factory.py | 24 +- src/graphon/dsl/slim/__init__.py | 10 + src/graphon/dsl/slim/agent.py | 98 ++++++ src/graphon/nodes/agent/__init__.py | 10 + src/graphon/nodes/agent/agent_node.py | 183 +++++++++++ src/graphon/nodes/agent/entities.py | 95 ++++++ src/graphon/nodes/agent/exc.py | 2 + src/graphon/nodes/runtime.py | 12 + tests/dsl/test_node_factory_agent.py | 83 +++++ tests/dsl/test_slim_agent.py | 377 ++++++++++++++++++++++ tests/dsl/test_slim_agent_node_runtime.py | 147 +++++++++ tests/nodes/agent/__init__.py | 0 tests/nodes/agent/test_agent_node.py | 292 +++++++++++++++++ 16 files changed, 1392 insertions(+), 9 deletions(-) create mode 100644 src/graphon/dsl/_provider.py create mode 100644 src/graphon/dsl/agent_runtime.py create mode 100644 src/graphon/dsl/slim/agent.py create mode 100644 src/graphon/nodes/agent/__init__.py create mode 100644 src/graphon/nodes/agent/agent_node.py create mode 100644 src/graphon/nodes/agent/entities.py create mode 100644 src/graphon/nodes/agent/exc.py create mode 100644 tests/dsl/test_node_factory_agent.py create mode 100644 tests/dsl/test_slim_agent.py create mode 100644 tests/dsl/test_slim_agent_node_runtime.py create mode 100644 tests/nodes/agent/__init__.py create mode 100644 tests/nodes/agent/test_agent_node.py diff --git a/src/graphon/dsl/_provider.py b/src/graphon/dsl/_provider.py new file mode 100644 index 0000000..a8eb740 --- /dev/null +++ b/src/graphon/dsl/_provider.py @@ -0,0 +1,8 @@ +from __future__ import annotations + + +def canonical_vendor(provider: str | None) -> str | None: + if not provider: + return None + parts = [part for part in provider.split("/") if part] + return parts[-1] if parts else provider diff --git a/src/graphon/dsl/agent_runtime.py b/src/graphon/dsl/agent_runtime.py new file mode 100644 index 0000000..0028c9c --- /dev/null +++ b/src/graphon/dsl/agent_runtime.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from collections.abc import Generator, Mapping +from dataclasses import dataclass +from typing import Any + +from graphon.nodes.agent.entities import AgentNodeData +from graphon.nodes.runtime import AgentNodeRuntimeProtocol +from graphon.nodes.tool_runtime_entities import ToolRuntimeMessage +from graphon.runtime.variable_pool import VariablePool + +from ._provider import canonical_vendor +from .slim.agent import SlimActionInvoker, SlimAgentStrategyClient +from .slim.client import SlimClientConfig + + +@dataclass(slots=True) +class SlimAgentNodeRuntime(AgentNodeRuntimeProtocol): + """Slim-backed runtime for Agent nodes. + + Args: + config: Slim client configuration (mode, plugin folder, daemon + address, etc.). Forwarded to each ``SlimAgentStrategyClient``. + action_invoker: Optional dependency-injection seam matching + ``SlimActionInvoker``. When set, slim subprocess invocation is + bypassed entirely — used in tests. + """ + + config: SlimClientConfig + action_invoker: SlimActionInvoker | None = None + + def invoke( + self, + *, + node_id: str, + node_data: AgentNodeData, + agent_strategy_params: Mapping[str, Any], + variable_pool: VariablePool | None, + ) -> Generator[ToolRuntimeMessage, None, None]: + """Invoke the strategy plugin and yield each decoded message. + + Yields: + One ``ToolRuntimeMessage`` per slim chunk in the strategy's + output stream. + """ + # node_id / variable_pool are unused at this layer: the slim + # action_invoker boundary is stateless and parameter resolution has + # already happened in AgentNode._run before invoke() is called. + _ = node_id, variable_pool + + provider_name = node_data.agent_strategy_provider_name + client = SlimAgentStrategyClient( + config=self.config, + plugin_id=node_data.plugin_unique_identifier, + agent_strategy_provider=canonical_vendor(provider_name) or provider_name, + agent_strategy=node_data.agent_strategy_name, + action_invoker=self.action_invoker, + ) + yield from client.invoke(agent_strategy_params=agent_strategy_params) diff --git a/src/graphon/dsl/importer.py b/src/graphon/dsl/importer.py index 5a9c7a8..f1ca5d4 100644 --- a/src/graphon/dsl/importer.py +++ b/src/graphon/dsl/importer.py @@ -58,6 +58,7 @@ class _DslKey(StrEnum): BuiltinNodeTypes.CODE, BuiltinNodeTypes.LLM, BuiltinNodeTypes.TOOL, + BuiltinNodeTypes.AGENT, )) diff --git a/src/graphon/dsl/node_factory.py b/src/graphon/dsl/node_factory.py index 61c5680..981bbf9 100644 --- a/src/graphon/dsl/node_factory.py +++ b/src/graphon/dsl/node_factory.py @@ -15,6 +15,8 @@ from graphon.file.models import File from graphon.model_runtime.entities.llm_entities import LLMMode from graphon.model_runtime.entities.message_entities import PromptMessage +from graphon.nodes.agent.agent_node import AgentNode +from graphon.nodes.agent.entities import AgentNodeData from graphon.nodes.answer.answer_node import AnswerNode from graphon.nodes.base.node import Node from graphon.nodes.code.code_node import CodeNode @@ -33,6 +35,8 @@ from graphon.runtime.graph_runtime_state import GraphRuntimeState from graphon.template_rendering import Jinja2TemplateRenderer, TemplateRenderError +from ._provider import canonical_vendor +from .agent_runtime import SlimAgentNodeRuntime from .code_runtime import SandboxCodeExecutor from .entities import ( DslCodeSettings, @@ -159,13 +163,6 @@ def replace(match: re.Match[str]) -> str: return _SIMPLE_JINJA_VARIABLE.sub(replace, template) -def _canonical_vendor(provider: str | None) -> str | None: - if not provider: - return None - parts = [part for part in provider.split("/") if part] - return parts[-1] if parts else provider - - def _plugin_prefix(provider: str | None) -> str | None: if not provider: return None @@ -434,6 +431,15 @@ def create_node(self, node_config: NodeConfigDict) -> Node: # noqa: C901, PLR09 tool_file_manager=_UnsupportedToolFileManager(), runtime=runtime, ) + case BuiltinNodeTypes.AGENT: + agent_data = AgentNodeData.model_validate(data_payload) + return AgentNode( + node_id=node_id, + data=agent_data, + graph_init_params=self.graph_init_params, + graph_runtime_state=self.graph_runtime_state, + runtime=SlimAgentNodeRuntime(config=self.slim_client_config), + ) case _: msg = f"Unsupported DSL node type: {node_type}" raise _dsl_error( @@ -447,7 +453,7 @@ def _create_llm_node(self, *, node_id: str, data: Mapping[str, Any]) -> LLMNode: normalized_data = dict(data) model = dict(normalized_data.get("model") or {}) raw_provider = str(model.get("provider") or "") - vendor = _canonical_vendor(raw_provider) + vendor = canonical_vendor(raw_provider) if not vendor: msg = "LLM node is missing model provider." raise _dsl_error( @@ -522,7 +528,7 @@ def _create_tool_runtime(self, node_data: ToolNodeData) -> SlimToolNodeRuntime: code="dependency.missing_plugin", details={"node_type": BuiltinNodeTypes.TOOL}, ) - provider = _canonical_vendor(node_data.provider_id) or _canonical_vendor( + provider = canonical_vendor(node_data.provider_id) or canonical_vendor( node_data.provider_name, ) if provider is None: diff --git a/src/graphon/dsl/slim/__init__.py b/src/graphon/dsl/slim/__init__.py index 840f1fa..b9c1085 100644 --- a/src/graphon/dsl/slim/__init__.py +++ b/src/graphon/dsl/slim/__init__.py @@ -1,3 +1,9 @@ +from .agent import ( + AgentRuntimeMessage, + SlimActionInvoker, + SlimAgentStrategyClient, + SlimAgentStrategyError, +) from .client import ( SlimChunkEvent, SlimClient, @@ -13,6 +19,10 @@ from .llm import SlimLLM, SlimStructuredOutputParseError __all__ = [ + "AgentRuntimeMessage", + "SlimActionInvoker", + "SlimAgentStrategyClient", + "SlimAgentStrategyError", "SlimChunkEvent", "SlimClient", "SlimClientConfig", diff --git a/src/graphon/dsl/slim/agent.py b/src/graphon/dsl/slim/agent.py new file mode 100644 index 0000000..5ab09d5 --- /dev/null +++ b/src/graphon/dsl/slim/agent.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from collections.abc import Callable, Generator, Iterable, Mapping +from dataclasses import dataclass, field +from typing import Any + +from graphon.dsl.slim.client import SlimClient, SlimClientConfig, SlimClientError +from graphon.dsl.tool_runtime import tool_runtime_message_from_payload +from graphon.nodes.tool.exc import ToolNodeError +from graphon.nodes.tool_runtime_entities import ToolRuntimeMessage + +_SLIM_ACTION_INVOKE_AGENT_STRATEGY = "invoke_agent_strategy" + + +# Dify Plugin SDK defines `AgentInvokeMessage(InvokeMessage): pass` — wire format +# is identical to tool messages, so we share the DTO and decoder. +type AgentRuntimeMessage = ToolRuntimeMessage + +type SlimActionInvoker = Callable[ + [str, str, Mapping[str, Any]], + Iterable[Mapping[str, Any]], +] + + +class SlimAgentStrategyError(RuntimeError): + """Raised when the slim ``invoke_agent_strategy`` action fails or returns + a payload that cannot be decoded. + + The original error (subprocess failure, ``SlimClientError``, decode + error, etc.) is preserved on ``__cause__`` via ``raise ... from error`` + so callers can introspect the failure source without parsing the + message string. + """ + + +@dataclass(slots=True) +class SlimAgentStrategyClient: + config: SlimClientConfig + plugin_id: str + agent_strategy_provider: str + agent_strategy: str + action_invoker: SlimActionInvoker | None = None + _client: SlimClient | None = field(init=False, repr=False) + + def __post_init__(self) -> None: + if self.action_invoker is None: + self._client = SlimClient(config=self.config) + else: + self._client = None + + def invoke( + self, + *, + agent_strategy_params: Mapping[str, Any], + ) -> Generator[AgentRuntimeMessage, None, None]: + """Invoke the agent strategy and yield each decoded message chunk.""" + data = { + "agent_strategy_provider": self.agent_strategy_provider, + "agent_strategy": self.agent_strategy, + "agent_strategy_params": dict(agent_strategy_params), + } + for payload in self._invoke_action(data): + try: + yield tool_runtime_message_from_payload(payload) + except ToolNodeError as error: + raise SlimAgentStrategyError(str(error)) from error + + def _invoke_action( + self, + data: Mapping[str, Any], + ) -> Iterable[Mapping[str, Any]]: + if self.action_invoker is not None: + return self.action_invoker( + self.plugin_id, + _SLIM_ACTION_INVOKE_AGENT_STRATEGY, + data, + ) + return self._invoke_client(data) + + def _invoke_client( + self, + data: Mapping[str, Any], + ) -> Generator[Mapping[str, Any], None, None]: + if self._client is None: + msg = "Slim client was not initialized." + raise SlimAgentStrategyError(msg) + try: + for chunk in self._client.invoke_chunks( + plugin_id=self.plugin_id, + action=_SLIM_ACTION_INVOKE_AGENT_STRATEGY, + data=data, + ): + if not isinstance(chunk, Mapping): + msg = f"Unexpected slim agent_strategy chunk: {chunk!r}" + raise SlimAgentStrategyError(msg) + yield chunk + except SlimClientError as error: + raise SlimAgentStrategyError(str(error)) from error diff --git a/src/graphon/nodes/agent/__init__.py b/src/graphon/nodes/agent/__init__.py new file mode 100644 index 0000000..a737571 --- /dev/null +++ b/src/graphon/nodes/agent/__init__.py @@ -0,0 +1,10 @@ +from graphon.nodes.agent.agent_node import AgentNode +from graphon.nodes.agent.entities import AgentNodeData, AgentParameterValue +from graphon.nodes.agent.exc import AgentNodeError + +__all__ = [ + "AgentNode", + "AgentNodeData", + "AgentNodeError", + "AgentParameterValue", +] diff --git a/src/graphon/nodes/agent/agent_node.py b/src/graphon/nodes/agent/agent_node.py new file mode 100644 index 0000000..098be68 --- /dev/null +++ b/src/graphon/nodes/agent/agent_node.py @@ -0,0 +1,183 @@ +from collections.abc import Generator, Iterable, Mapping +from typing import Any, override + +from graphon.entities.graph_init_params import GraphInitParams +from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus +from graphon.node_events.agent import AgentLogEvent +from graphon.node_events.base import NodeEventBase, NodeRunResult +from graphon.node_events.node import StreamChunkEvent, StreamCompletedEvent +from graphon.nodes.agent.entities import AgentNodeData, AgentParameterValue +from graphon.nodes.agent.exc import AgentNodeError +from graphon.nodes.base.node import Node +from graphon.nodes.runtime import AgentNodeRuntimeProtocol +from graphon.nodes.tool_runtime_entities import ToolRuntimeMessage +from graphon.runtime.graph_runtime_state import GraphRuntimeState +from graphon.runtime.variable_pool import VariablePool + + +class AgentNode(Node[AgentNodeData]): + """Workflow node that invokes a Dify ``agent_strategy`` plugin.""" + + node_type = BuiltinNodeTypes.AGENT + + @override + def __init__( + self, + node_id: str, + data: AgentNodeData, + *, + graph_init_params: GraphInitParams, + graph_runtime_state: GraphRuntimeState, + runtime: AgentNodeRuntimeProtocol, + ) -> None: + super().__init__( + node_id=node_id, + data=data, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + ) + self._runtime = runtime + + @classmethod + @override + def version(cls) -> str: + return "1" + + @override + def _run(self) -> Generator[NodeEventBase, None, None]: + variable_pool = self.graph_runtime_state.variable_pool + + try: + resolved_params = _resolve_agent_parameters( + self.node_data.agent_parameters, + variable_pool=variable_pool, + ) + except AgentNodeError as error: + yield StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + error=str(error), + error_type=type(error).__name__, + ), + ) + return + + text_buffer: list[str] = [] + json_outputs: list[Any] = [] + variable_outputs: dict[str, Any] = {} + execution_id = self.ensure_execution_id() + + try: + message_stream = self._runtime.invoke( + node_id=self._node_id, + node_data=self.node_data, + agent_strategy_params=resolved_params, + variable_pool=variable_pool, + ) + for message in message_stream: + yield from _dispatch_message( + message, + node_id=self._node_id, + execution_id=execution_id, + text_buffer=text_buffer, + json_outputs=json_outputs, + variable_outputs=variable_outputs, + ) + except Exception as error: # noqa: BLE001 + yield StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=dict(resolved_params), + error=str(error), + error_type=type(error).__name__, + ), + ) + return + + outputs: dict[str, Any] = {"text": "".join(text_buffer)} + if json_outputs: + outputs["json"] = ( + json_outputs[-1] if len(json_outputs) == 1 else list(json_outputs) + ) + outputs.update(variable_outputs) + + yield StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs=dict(resolved_params), + outputs=outputs, + ), + ) + + +def _resolve_agent_parameters( + parameters: Mapping[str, AgentParameterValue], + *, + variable_pool: VariablePool, +) -> dict[str, Any]: + result: dict[str, Any] = {} + for name, wrapper in parameters.items(): + match wrapper.type: + case "constant": + result[name] = wrapper.value + case "mixed": + template = str(wrapper.value) if wrapper.value is not None else "" + result[name] = variable_pool.convert_template(template).text + case "variable": + if not isinstance(wrapper.value, list) or not all( + isinstance(part, str) for part in wrapper.value + ): + msg = ( + f"Variable agent parameter {name!r} must be a list of " + f"string segments." + ) + raise AgentNodeError(msg) + variable = variable_pool.get(wrapper.value) + if variable is not None: + result[name] = variable.value + return result + + +def _dispatch_message( + message: ToolRuntimeMessage, + *, + node_id: str, + execution_id: str, + text_buffer: list[str], + json_outputs: list[Any], + variable_outputs: dict[str, Any], +) -> Iterable[NodeEventBase]: + payload = message.message + match message.type: + case ToolRuntimeMessage.MessageType.TEXT | ToolRuntimeMessage.MessageType.LINK: + if isinstance(payload, ToolRuntimeMessage.TextMessage): + text_buffer.append(payload.text) + yield StreamChunkEvent( + selector=[node_id, "text"], + chunk=payload.text, + is_final=False, + ) + case ToolRuntimeMessage.MessageType.JSON: + if ( + isinstance(payload, ToolRuntimeMessage.JsonMessage) + and not payload.suppress_output + ): + json_outputs.append(payload.json_object) + case ToolRuntimeMessage.MessageType.LOG: + if isinstance(payload, ToolRuntimeMessage.LogMessage): + yield AgentLogEvent( + message_id=payload.id, + label=payload.label, + node_execution_id=execution_id, + parent_id=payload.parent_id, + error=payload.error, + status=payload.status.value, + data=dict(payload.data), + metadata=dict(payload.metadata), + node_id=node_id, + ) + case ToolRuntimeMessage.MessageType.VARIABLE: + if isinstance(payload, ToolRuntimeMessage.VariableMessage): + variable_outputs[payload.variable_name] = payload.variable_value + case _: + pass diff --git a/src/graphon/nodes/agent/entities.py b/src/graphon/nodes/agent/entities.py new file mode 100644 index 0000000..aa4578e --- /dev/null +++ b/src/graphon/nodes/agent/entities.py @@ -0,0 +1,95 @@ +from collections.abc import Mapping +from typing import Any, Literal + +from pydantic import BaseModel, Field, field_validator +from pydantic_core.core_schema import ValidationInfo + +from graphon.entities.base_node_data import BaseNodeData +from graphon.enums import BuiltinNodeTypes, NodeType + +_SUPPORTED_CONSTANT_VALUE_TYPES = (str, int, float, bool, dict, list, type(None)) +_SUPPORTED_CONSTANT_VALUE_TYPE_NAMES = ", ".join( + "None" if value_type is type(None) else value_type.__name__ + for value_type in _SUPPORTED_CONSTANT_VALUE_TYPES +) + + +class AgentParameterValue(BaseModel): + value: list[str] | str | int | float | bool | dict[str, Any] | list[Any] | None + type: Literal["constant", "variable", "mixed"] + + @field_validator("type", mode="before") + @classmethod + def check_type( + cls, + value: Any, + validation_info: ValidationInfo, + ) -> Literal["constant", "variable", "mixed"]: + wrapper_type = value + payload = validation_info.data.get("value") + + if payload is None: + return wrapper_type + + match wrapper_type: + case "mixed": + match payload: + case str(): + pass + case _: + msg = "mixed agent parameter value must be a string" + raise ValueError(msg) + case "variable": + match payload: + case list() if all(isinstance(part, str) for part in payload): + pass + case _: + msg = "variable agent parameter value must be a list of strings" + raise ValueError(msg) + case "constant": + match payload: + case str() | int() | float() | bool() | dict() | list() | None: + pass + case _: + msg = ( + f"constant agent parameter value must be one of: " + f"{_SUPPORTED_CONSTANT_VALUE_TYPE_NAMES}" + ) + raise ValueError(msg) + return wrapper_type + + +class AgentNodeData(BaseNodeData): + """Persisted configuration for an Agent node.""" + + type: NodeType = BuiltinNodeTypes.AGENT + + agent_strategy_provider_name: str = Field( + ..., + description=( + "Fully qualified provider name from the agent_strategy plugin, " + "e.g. 'langgenius/agent/agent'." + ), + ) + agent_strategy_name: str = Field( + ..., + description="Strategy name within the provider, e.g. 'function_calling'.", + ) + agent_strategy_label: str | None = Field( + default=None, + description="Human-readable strategy label emitted by Dify Studio.", + ) + plugin_unique_identifier: str = Field( + ..., + description="Marketplace plugin identifier of the agent_strategy plugin.", + ) + agent_parameters: Mapping[str, AgentParameterValue] = Field(default_factory=dict) + output_schema: Mapping[str, Any] = Field(default_factory=dict) + meta: Mapping[str, Any] = Field(default_factory=dict) + tool_node_version: str = Field( + default="2", + description=( + "Tool-node protocol version that the strategy plugin expects. " + "Forwarded for plugin compatibility; not interpreted by the node." + ), + ) diff --git a/src/graphon/nodes/agent/exc.py b/src/graphon/nodes/agent/exc.py new file mode 100644 index 0000000..dd39e48 --- /dev/null +++ b/src/graphon/nodes/agent/exc.py @@ -0,0 +1,2 @@ +class AgentNodeError(RuntimeError): + """Raised when an Agent node fails to execute or yields invalid messages.""" diff --git a/src/graphon/nodes/runtime.py b/src/graphon/nodes/runtime.py index 1426cf1..272be27 100644 --- a/src/graphon/nodes/runtime.py +++ b/src/graphon/nodes/runtime.py @@ -13,6 +13,7 @@ ) if TYPE_CHECKING: + from graphon.nodes.agent.entities import AgentNodeData from graphon.nodes.human_input.entities import HumanInputNodeData from graphon.nodes.human_input.enums import HumanInputFormStatus from graphon.nodes.tool.entities import ToolNodeData @@ -64,6 +65,17 @@ def get_usage( def build_file_reference(self, *, mapping: Mapping[str, Any]) -> Any: ... +class AgentNodeRuntimeProtocol(Protocol): + def invoke( + self, + *, + node_id: str, + node_data: AgentNodeData, + agent_strategy_params: Mapping[str, Any], + variable_pool: VariablePool | None, + ) -> Generator[ToolRuntimeMessage, None, None]: ... + + @runtime_checkable class HumanInputNodeRuntimeProtocol(Protocol): """Workflow-layer adapter for human-input runtime persistence and delivery.""" diff --git a/tests/dsl/test_node_factory_agent.py b/tests/dsl/test_node_factory_agent.py new file mode 100644 index 0000000..bb679b6 --- /dev/null +++ b/tests/dsl/test_node_factory_agent.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import time +from pathlib import Path +from typing import cast + +from graphon.dsl.agent_runtime import SlimAgentNodeRuntime +from graphon.dsl.entities import DslCredentials, DslSlimSettings +from graphon.dsl.node_factory import SlimDslNodeFactory +from graphon.entities.graph_config import NodeConfigDict +from graphon.entities.graph_init_params import GraphInitParams +from graphon.nodes.agent import AgentNode +from graphon.runtime.graph_runtime_state import GraphRuntimeState +from graphon.runtime.variable_pool import VariablePool + + +def _factory(tmp_path: Path) -> SlimDslNodeFactory: + return SlimDslNodeFactory( + graph_config={"nodes": [], "edges": []}, + graph_init_params=GraphInitParams( + workflow_id="wf", + graph_config={"nodes": [], "edges": []}, + run_context={}, + call_depth=0, + ), + graph_runtime_state=GraphRuntimeState( + variable_pool=VariablePool.from_bootstrap(), + start_at=time.time(), + ), + credentials=DslCredentials( + slim=DslSlimSettings(mode="local", plugin_folder=str(tmp_path)), + ), + dependencies=[], + ) + + +def test_create_node_routes_agent_type_to_agent_node(tmp_path: Path) -> None: + factory = _factory(tmp_path) + node_config = { + "id": "agent-1", + "data": { + "type": "agent", + "title": "Agent", + "agent_strategy_provider_name": "langgenius/agent/agent", + "agent_strategy_name": "function_calling", + "plugin_unique_identifier": "langgenius/agent:0.0.36@hash", + "agent_parameters": { + "instruction": {"type": "constant", "value": "be helpful"}, + }, + "tool_node_version": "2", + }, + } + + node = factory.create_node(cast("NodeConfigDict", node_config)) + + assert isinstance(node, AgentNode) + assert node.node_data.agent_strategy_name == "function_calling" + assert node.node_data.plugin_unique_identifier == "langgenius/agent:0.0.36@hash" + + +def test_create_node_injects_slim_agent_node_runtime(tmp_path: Path) -> None: + """The factory's default agent runtime is the slim-backed adapter — node + instantiation must succeed without any extra wiring at the call site. + """ + factory = _factory(tmp_path) + node_config = { + "id": "agent-1", + "data": { + "type": "agent", + "title": "Agent", + "agent_strategy_provider_name": "langgenius/agent/agent", + "agent_strategy_name": "ReAct", + "plugin_unique_identifier": "langgenius/agent:0.0.36@hash", + "agent_parameters": {}, + }, + } + + node = factory.create_node(cast("NodeConfigDict", node_config)) + assert isinstance(node, AgentNode) + # Access the private slot to confirm the wired runtime is the slim adapter. + # Acceptable in tests; mirrors the pattern used elsewhere when verifying + # factory wiring. + assert isinstance(node._runtime, SlimAgentNodeRuntime) diff --git a/tests/dsl/test_slim_agent.py b/tests/dsl/test_slim_agent.py new file mode 100644 index 0000000..894f3da --- /dev/null +++ b/tests/dsl/test_slim_agent.py @@ -0,0 +1,377 @@ +from __future__ import annotations + +from collections.abc import Iterable, Mapping +from pathlib import Path +from typing import Any + +import pytest + +from graphon.dsl.slim.agent import ( + AgentRuntimeMessage, + SlimAgentStrategyClient, + SlimAgentStrategyError, +) +from graphon.dsl.slim.client import SlimClient, SlimClientConfig, SlimClientError +from graphon.nodes.tool_runtime_entities import ToolRuntimeMessage + + +def _config(tmp_path: Path) -> SlimClientConfig: + return SlimClientConfig(folder=tmp_path) + + +def _record_invoker( + chunks: Iterable[Mapping[str, Any]], + *, + captured: list[tuple[str, str, Mapping[str, Any]]], +) -> Any: + def invoker( + plugin_id: str, + action: str, + data: Mapping[str, Any], + ) -> Iterable[Mapping[str, Any]]: + captured.append((plugin_id, action, dict(data))) + return list(chunks) + + return invoker + + +def test_invoke_passes_plugin_id_action_and_payload(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="langgenius/agent:0.0.36@hash", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=_record_invoker([], captured=captured), + ) + + list(client.invoke(agent_strategy_params={"instruction": "test", "query": "hi"})) + + assert captured == [ + ( + "langgenius/agent:0.0.36@hash", + "invoke_agent_strategy", + { + "agent_strategy_provider": "agent", + "agent_strategy": "function_calling", + "agent_strategy_params": {"instruction": "test", "query": "hi"}, + }, + ), + ] + + +def test_invoke_decodes_text_message(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=_record_invoker( + [{"type": "text", "message": {"text": "hello"}}], + captured=captured, + ), + ) + + messages = list(client.invoke(agent_strategy_params={})) + + assert len(messages) == 1 + message = messages[0] + assert message.type == ToolRuntimeMessage.MessageType.TEXT + assert isinstance(message.message, ToolRuntimeMessage.TextMessage) + assert message.message.text == "hello" + + +def test_invoke_decodes_mixed_message_stream(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="ReAct", + action_invoker=_record_invoker( + [ + { + "type": "log", + "message": { + "id": "log-1", + "label": "thought", + "status": "start", + "data": {"reasoning": "calling tool"}, + }, + }, + { + "type": "json", + "message": {"json_object": {"result": 42}}, + }, + {"type": "text", "message": {"text": "done"}}, + ], + captured=captured, + ), + ) + + messages = list(client.invoke(agent_strategy_params={})) + + assert [m.type for m in messages] == [ + ToolRuntimeMessage.MessageType.LOG, + ToolRuntimeMessage.MessageType.JSON, + ToolRuntimeMessage.MessageType.TEXT, + ] + log_message = messages[0].message + assert isinstance(log_message, ToolRuntimeMessage.LogMessage) + assert log_message.label == "thought" + assert log_message.data == {"reasoning": "calling tool"} + json_message = messages[1].message + assert isinstance(json_message, ToolRuntimeMessage.JsonMessage) + assert json_message.json_object == {"result": 42} + + +def test_invoke_propagates_meta(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=_record_invoker( + [ + { + "type": "text", + "message": {"text": "hi"}, + "meta": {"latency": 12, "tokens": 5}, + }, + ], + captured=captured, + ), + ) + + messages = list(client.invoke(agent_strategy_params={})) + + assert messages[0].meta == {"latency": 12, "tokens": 5} + + +def test_invoke_raises_on_unknown_message_type(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=_record_invoker( + [{"type": "ufo", "message": {"text": "??"}}], + captured=captured, + ), + ) + + with pytest.raises(SlimAgentStrategyError, match="message type 'ufo'"): + list(client.invoke(agent_strategy_params={})) + + +def test_invoke_raises_when_action_invoker_yields_slim_client_error( + tmp_path: Path, +) -> None: + error_message = "daemon offline" + + def failing_invoker( + plugin_id: str, + action: str, + data: Mapping[str, Any], + ) -> Iterable[Mapping[str, Any]]: + _ = plugin_id, action, data + raise SlimClientError(error_message) + + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=failing_invoker, + ) + + # SlimClientError raised inside the invoker propagates directly when an + # action_invoker is injected (it bypasses the SlimClient bridge). This + # captures the dependency-injection contract: callers can raise any error + # they want from the invoker. + with pytest.raises(SlimClientError, match="daemon offline"): + list(client.invoke(agent_strategy_params={})) + + +def test_invoke_supports_partial_consumption(tmp_path: Path) -> None: + """``invoke`` is a lazy generator — partial consumption is supported. + + The caller may take only the first ``N`` chunks and close the generator; + the upstream action invoker must be a coroutine-aware generator that + receives ``GeneratorExit`` cleanly when ``close()`` is called. + """ + chunks_produced: list[int] = [] + + def lazy_invoker( + plugin_id: str, + action: str, + data: Mapping[str, Any], + ) -> Iterable[Mapping[str, Any]]: + _ = plugin_id, action, data + for index in range(5): + chunks_produced.append(index) + yield {"type": "text", "message": {"text": f"chunk-{index}"}} + + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=lazy_invoker, + ) + + iterator = client.invoke(agent_strategy_params={}) + first = next(iterator) + iterator.close() + + assert isinstance(first.message, ToolRuntimeMessage.TextMessage) + assert first.message.text == "chunk-0" + # Only the first chunk was produced before close() — confirms laziness. + assert chunks_produced == [0] + + +def test_invoke_passes_through_empty_stream(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=_record_invoker([], captured=captured), + ) + + messages = list(client.invoke(agent_strategy_params={"x": 1})) + + assert messages == [] + assert len(captured) == 1 + + +def test_invoke_isolates_params_from_caller_mutations(tmp_path: Path) -> None: + """Caller-side mutation of ``agent_strategy_params`` after ``invoke`` must + not leak into the data sent to the action invoker. + + Confirms the shallow ``dict(...)`` copy in ``SlimAgentStrategyClient.invoke`` + is sufficient defensive isolation for the top-level mapping. (Deep copies + of nested mutable values are deliberately not made — callers must not + mutate nested values mid-flight either, but that is out of contract.) + """ + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + action_invoker=_record_invoker([], captured=captured), + ) + params: dict[str, Any] = {"instruction": "original", "max_iterations": 3} + + list(client.invoke(agent_strategy_params=params)) + params["instruction"] = "mutated-after-invoke" + params["max_iterations"] = 99 + params["extra"] = "should-not-appear" + + assert captured[0][2]["agent_strategy_params"] == { + "instruction": "original", + "max_iterations": 3, + } + + +def test_invoke_via_client_wraps_slim_client_error( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When the real ``SlimClient`` path raises ``SlimClientError`` (for any + reason — subprocess failure, payload serialization rejection, malformed + chunks), ``SlimAgentStrategyClient`` wraps it into + ``SlimAgentStrategyError``. + + This is the counterpart to + ``test_invoke_raises_when_action_invoker_yields_slim_client_error``: the + two paths have intentionally different error-handling contracts because + the ``action_invoker`` is caller-owned test infrastructure that should + propagate its own errors unchanged, while the ``SlimClient`` path is + library-owned and normalizes errors to a single typed boundary. + """ + fake_binary = tmp_path / "fake-slim" + fake_binary.write_text("#!/bin/sh\nexit 0\n") + fake_binary.chmod(0o755) + monkeypatch.setenv("SLIM_BINARY_PATH", str(fake_binary)) + + error_message = "slim subprocess crashed mid-payload" + + def failing_invoke_chunks( + self: SlimClient, + *, + plugin_id: str, + action: str, + data: Mapping[str, Any], + ) -> Iterable[Any]: + _ = self, plugin_id, action, data + raise SlimClientError(error_message) + + monkeypatch.setattr(SlimClient, "invoke_chunks", failing_invoke_chunks) + + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + # action_invoker is None -> go through the real SlimClient path + ) + + with pytest.raises(SlimAgentStrategyError, match=error_message): + list(client.invoke(agent_strategy_params={})) + + +def test_invoke_via_client_wraps_non_mapping_chunk( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The ``_client`` path defends against non-Mapping chunk payloads by + raising ``SlimAgentStrategyError``. The action_invoker path returns + ``Iterable[Mapping[str, Any]]`` and has no such check (its type is the + contract). + """ + fake_binary = tmp_path / "fake-slim" + fake_binary.write_text("#!/bin/sh\nexit 0\n") + fake_binary.chmod(0o755) + monkeypatch.setenv("SLIM_BINARY_PATH", str(fake_binary)) + + def bogus_invoke_chunks( + self: SlimClient, + *, + plugin_id: str, + action: str, + data: Mapping[str, Any], + ) -> Iterable[Any]: + _ = self, plugin_id, action, data + yield "not-a-mapping" + + monkeypatch.setattr(SlimClient, "invoke_chunks", bogus_invoke_chunks) + + client = SlimAgentStrategyClient( + config=_config(tmp_path), + plugin_id="plugin", + agent_strategy_provider="agent", + agent_strategy="function_calling", + ) + + with pytest.raises( + SlimAgentStrategyError, match="Unexpected slim agent_strategy chunk" + ): + list(client.invoke(agent_strategy_params={})) + + +def test_agent_runtime_message_is_tool_runtime_message_alias() -> None: + """The ``AgentRuntimeMessage`` PEP 695 alias resolves to + ``ToolRuntimeMessage`` at runtime. + + PEP 695 ``type X = Y`` creates a ``TypeAliasType`` wrapper rather than + rebinding ``X`` to ``Y`` directly, so ``is`` does not hold; the underlying + type is exposed via ``__value__``. This is the semantic-naming contract + that PR-B and other agent consumers depend on. + """ + assert AgentRuntimeMessage.__value__ is ToolRuntimeMessage diff --git a/tests/dsl/test_slim_agent_node_runtime.py b/tests/dsl/test_slim_agent_node_runtime.py new file mode 100644 index 0000000..a3aa62f --- /dev/null +++ b/tests/dsl/test_slim_agent_node_runtime.py @@ -0,0 +1,147 @@ +from __future__ import annotations + +from collections.abc import Iterable, Mapping +from pathlib import Path +from typing import Any + +from graphon.dsl._provider import canonical_vendor +from graphon.dsl.agent_runtime import SlimAgentNodeRuntime +from graphon.dsl.slim.client import SlimClientConfig +from graphon.nodes.agent.entities import AgentNodeData +from graphon.nodes.tool_runtime_entities import ToolRuntimeMessage +from graphon.runtime.variable_pool import VariablePool + + +def _node_data( + *, + provider_name: str = "langgenius/agent/agent", + strategy: str = "function_calling", + plugin_id: str = "langgenius/agent:0.0.36@hash", +) -> AgentNodeData: + return AgentNodeData( + title="Agent", + agent_strategy_provider_name=provider_name, + agent_strategy_name=strategy, + plugin_unique_identifier=plugin_id, + ) + + +def _record_invoker( + chunks: list[Mapping[str, Any]], + *, + captured: list[tuple[str, str, Mapping[str, Any]]], +) -> Any: + def invoker( + plugin_id: str, + action: str, + data: Mapping[str, Any], + ) -> Iterable[Mapping[str, Any]]: + captured.append((plugin_id, action, dict(data))) + return list(chunks) + + return invoker + + +def test_canonical_vendor_extracts_trailing_segment() -> None: + assert canonical_vendor("langgenius/agent/agent") == "agent" + assert canonical_vendor("openai") == "openai" + assert canonical_vendor("//x//") == "x" + assert canonical_vendor("") is None + + +def test_invoke_forwards_payload_with_plugin_id_and_strategy(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + runtime = SlimAgentNodeRuntime( + config=SlimClientConfig(folder=tmp_path), + action_invoker=_record_invoker([], captured=captured), + ) + + list( + runtime.invoke( + node_id="agent", + node_data=_node_data(), + agent_strategy_params={"instruction": "test", "query": "hi"}, + variable_pool=VariablePool.from_bootstrap(), + ), + ) + + assert captured == [ + ( + "langgenius/agent:0.0.36@hash", + "invoke_agent_strategy", + { + "agent_strategy_provider": "agent", + "agent_strategy": "function_calling", + "agent_strategy_params": {"instruction": "test", "query": "hi"}, + }, + ), + ] + + +def test_invoke_decodes_text_messages_through_shared_decoder(tmp_path: Path) -> None: + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + runtime = SlimAgentNodeRuntime( + config=SlimClientConfig(folder=tmp_path), + action_invoker=_record_invoker( + [{"type": "text", "message": {"text": "hello"}}], + captured=captured, + ), + ) + + messages = list( + runtime.invoke( + node_id="agent", + node_data=_node_data(), + agent_strategy_params={}, + variable_pool=None, + ), + ) + + assert len(messages) == 1 + assert messages[0].type == ToolRuntimeMessage.MessageType.TEXT + assert isinstance(messages[0].message, ToolRuntimeMessage.TextMessage) + assert messages[0].message.text == "hello" + + +def test_invoke_uses_per_node_strategy_identifiers(tmp_path: Path) -> None: + """Each call freshly assembles a SlimAgentStrategyClient from node_data, + so the same runtime instance can serve different agent nodes. + """ + captured: list[tuple[str, str, Mapping[str, Any]]] = [] + runtime = SlimAgentNodeRuntime( + config=SlimClientConfig(folder=tmp_path), + action_invoker=_record_invoker([], captured=captured), + ) + + list( + runtime.invoke( + node_id="agent-a", + node_data=_node_data( + provider_name="vendor/plugin/react_provider", + strategy="ReAct", + plugin_id="vendor/plugin:1.0@aaa", + ), + agent_strategy_params={}, + variable_pool=None, + ), + ) + list( + runtime.invoke( + node_id="agent-b", + node_data=_node_data( + provider_name="vendor/plugin/fc_provider", + strategy="function_calling", + plugin_id="vendor/plugin:1.0@bbb", + ), + agent_strategy_params={}, + variable_pool=None, + ), + ) + + assert len(captured) == 2 + assert captured[0][0] == "vendor/plugin:1.0@aaa" + assert captured[0][2]["agent_strategy_provider"] == "react_provider" + assert captured[0][2]["agent_strategy"] == "ReAct" + assert captured[1][0] == "vendor/plugin:1.0@bbb" + assert captured[1][2]["agent_strategy_provider"] == "fc_provider" + assert captured[1][2]["agent_strategy"] == "function_calling" diff --git a/tests/nodes/agent/__init__.py b/tests/nodes/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nodes/agent/test_agent_node.py b/tests/nodes/agent/test_agent_node.py new file mode 100644 index 0000000..c83bbba --- /dev/null +++ b/tests/nodes/agent/test_agent_node.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import time +from collections.abc import Generator, Mapping +from dataclasses import dataclass, field +from typing import Any + +from graphon.enums import WorkflowNodeExecutionStatus +from graphon.graph_events.agent import NodeRunAgentLogEvent +from graphon.graph_events.node import ( + NodeRunStreamChunkEvent, + NodeRunSucceededEvent, +) +from graphon.nodes.agent import AgentNode, AgentNodeData, AgentParameterValue +from graphon.nodes.runtime import AgentNodeRuntimeProtocol +from graphon.nodes.tool_runtime_entities import ToolRuntimeMessage +from graphon.runtime.graph_runtime_state import GraphRuntimeState +from graphon.runtime.variable_pool import VariablePool +from tests.helpers.builders import build_graph_init_params, build_variable_pool + + +@dataclass(slots=True) +class _FakeAgentNodeRuntime(AgentNodeRuntimeProtocol): + """Test stub that yields a canned ``ToolRuntimeMessage`` stream.""" + + messages: list[ToolRuntimeMessage] = field(default_factory=list) + captured_params: list[Mapping[str, Any]] = field(default_factory=list) + raise_error: BaseException | None = None + + def invoke( + self, + *, + node_id: str, + node_data: AgentNodeData, + agent_strategy_params: Mapping[str, Any], + variable_pool: VariablePool | None, + ) -> Generator[ToolRuntimeMessage, None, None]: + _ = node_id, node_data, variable_pool + self.captured_params.append(dict(agent_strategy_params)) + if self.raise_error is not None: + raise self.raise_error + yield from self.messages + + +def _text(text: str) -> ToolRuntimeMessage: + return ToolRuntimeMessage( + type=ToolRuntimeMessage.MessageType.TEXT, + message=ToolRuntimeMessage.TextMessage(text=text), + ) + + +def _log( + *, label: str, status: str = "start", data: dict[str, Any] | None = None +) -> ToolRuntimeMessage: + return ToolRuntimeMessage( + type=ToolRuntimeMessage.MessageType.LOG, + message=ToolRuntimeMessage.LogMessage( + id=f"log-{label}", + label=label, + status=ToolRuntimeMessage.LogMessage.LogStatus(status), + data=data or {}, + ), + ) + + +def _json(payload: dict[str, Any]) -> ToolRuntimeMessage: + return ToolRuntimeMessage( + type=ToolRuntimeMessage.MessageType.JSON, + message=ToolRuntimeMessage.JsonMessage(json_object=payload), + ) + + +def _variable(name: str, value: Any) -> ToolRuntimeMessage: + return ToolRuntimeMessage( + type=ToolRuntimeMessage.MessageType.VARIABLE, + message=ToolRuntimeMessage.VariableMessage( + variable_name=name, + variable_value=value, + ), + ) + + +def _node_data( + *, + parameters: dict[str, AgentParameterValue] | None = None, +) -> AgentNodeData: + return AgentNodeData( + title="Agent", + agent_strategy_provider_name="langgenius/agent/agent", + agent_strategy_name="function_calling", + plugin_unique_identifier="langgenius/agent:0.0.36@hash", + agent_parameters=parameters or {}, + ) + + +def _build_agent_node( + *, + runtime: AgentNodeRuntimeProtocol, + parameters: dict[str, AgentParameterValue] | None = None, + variable_pool: VariablePool | None = None, +) -> tuple[AgentNode, GraphRuntimeState]: + state = GraphRuntimeState( + variable_pool=variable_pool or build_variable_pool(), + start_at=time.time(), + ) + node = AgentNode( + node_id="agent", + data=_node_data(parameters=parameters), + graph_init_params=build_graph_init_params(), + graph_runtime_state=state, + runtime=runtime, + ) + return node, state + + +def test_run_streams_text_messages_and_concatenates_outputs() -> None: + runtime = _FakeAgentNodeRuntime( + messages=[_text("Hello "), _text("world!")], + ) + node, _ = _build_agent_node(runtime=runtime) + + events = list(node.run()) + + chunk_events = [e for e in events if isinstance(e, NodeRunStreamChunkEvent)] + assert [e.chunk for e in chunk_events] == ["Hello ", "world!"] + assert all(tuple(e.selector) == ("agent", "text") for e in chunk_events) + + success_events = [e for e in events if isinstance(e, NodeRunSucceededEvent)] + assert len(success_events) == 1 + outputs = success_events[0].node_run_result.outputs + assert outputs["text"] == "Hello world!" + + +def test_run_emits_log_messages_as_agent_log_events() -> None: + runtime = _FakeAgentNodeRuntime( + messages=[ + _log(label="thinking", status="start", data={"step": 1}), + _text("answer"), + _log(label="thinking", status="success", data={"step": 1}), + ], + ) + node, _ = _build_agent_node(runtime=runtime) + + events = list(node.run()) + + log_events = [e for e in events if isinstance(e, NodeRunAgentLogEvent)] + assert len(log_events) == 2 + assert [e.label for e in log_events] == ["thinking", "thinking"] + assert [e.status for e in log_events] == ["start", "success"] + assert log_events[0].data == {"step": 1} + assert all(e.node_id == "agent" for e in log_events) + + +def test_run_collects_json_message_into_outputs() -> None: + runtime = _FakeAgentNodeRuntime( + messages=[ + _text("done"), + _json({"result": 42, "kind": "answer"}), + ], + ) + node, _ = _build_agent_node(runtime=runtime) + + events = list(node.run()) + success = next(e for e in events if isinstance(e, NodeRunSucceededEvent)) + outputs = success.node_run_result.outputs + assert outputs["text"] == "done" + assert outputs["json"] == {"result": 42, "kind": "answer"} + + +def test_run_collects_multiple_json_messages_into_list() -> None: + runtime = _FakeAgentNodeRuntime( + messages=[ + _json({"step": 1}), + _json({"step": 2}), + ], + ) + node, _ = _build_agent_node(runtime=runtime) + + events = list(node.run()) + success = next(e for e in events if isinstance(e, NodeRunSucceededEvent)) + assert success.node_run_result.outputs["json"] == [{"step": 1}, {"step": 2}] + + +def test_run_assigns_variable_messages_to_outputs() -> None: + runtime = _FakeAgentNodeRuntime( + messages=[ + _variable("answer", "ok"), + _variable("hits", 7), + ], + ) + node, _ = _build_agent_node(runtime=runtime) + + events = list(node.run()) + success = next(e for e in events if isinstance(e, NodeRunSucceededEvent)) + outputs = success.node_run_result.outputs + assert outputs["answer"] == "ok" + assert outputs["hits"] == 7 + + +def test_run_resolves_constant_parameter_unchanged() -> None: + runtime = _FakeAgentNodeRuntime(messages=[_text("ok")]) + parameters = { + "instruction": AgentParameterValue(type="constant", value="be concise"), + "max_iterations": AgentParameterValue(type="constant", value=5), + } + node, _ = _build_agent_node(runtime=runtime, parameters=parameters) + + list(node.run()) + + assert runtime.captured_params == [ + {"instruction": "be concise", "max_iterations": 5}, + ] + + +def test_run_resolves_variable_parameter_from_pool() -> None: + pool = build_variable_pool( + variables=[(("start", "query"), "What is the answer?")], + ) + runtime = _FakeAgentNodeRuntime(messages=[_text("ok")]) + parameters = { + "query": AgentParameterValue(type="variable", value=["start", "query"]), + } + node, _ = _build_agent_node( + runtime=runtime, + parameters=parameters, + variable_pool=pool, + ) + + list(node.run()) + + assert runtime.captured_params == [{"query": "What is the answer?"}] + + +def test_run_resolves_mixed_parameter_via_template() -> None: + pool = build_variable_pool( + variables=[(("start", "name"), "Ben")], + ) + runtime = _FakeAgentNodeRuntime(messages=[_text("ok")]) + parameters = { + "instruction": AgentParameterValue( + type="mixed", + value="Hello {{#start.name#}}, please respond.", + ), + } + node, _ = _build_agent_node( + runtime=runtime, + parameters=parameters, + variable_pool=pool, + ) + + list(node.run()) + + assert runtime.captured_params == [ + {"instruction": "Hello Ben, please respond."}, + ] + + +def test_run_omits_missing_variable_parameter_silently() -> None: + pool = build_variable_pool() + runtime = _FakeAgentNodeRuntime(messages=[_text("ok")]) + parameters = { + "query": AgentParameterValue(type="variable", value=["start", "missing"]), + } + node, _ = _build_agent_node( + runtime=runtime, + parameters=parameters, + variable_pool=pool, + ) + + list(node.run()) + + assert runtime.captured_params == [{}] + + +def test_run_reports_failure_when_runtime_raises() -> None: + runtime = _FakeAgentNodeRuntime(raise_error=RuntimeError("strategy crashed")) + node, _ = _build_agent_node(runtime=runtime) + + events = list(node.run()) + success = [e for e in events if isinstance(e, NodeRunSucceededEvent)] + assert success == [] + failure_events = [ + e for e in events if getattr(e, "node_run_result", None) is not None + ] + # Look at the final node result to confirm failure was reported. + failed = next( + e + for e in failure_events + if e.node_run_result.status == WorkflowNodeExecutionStatus.FAILED + ) + assert "strategy crashed" in failed.node_run_result.error + assert failed.node_run_result.error_type == "RuntimeError" From 07188736b162197a4a852afd4ff367f80027d80f Mon Sep 17 00:00:00 2001 From: Benjamin Date: Tue, 12 May 2026 09:27:12 +0800 Subject: [PATCH 2/5] feat(examples): runnable chatflow_dsl_runner with diagnostics and streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Standard downstream-integration pattern for executing Dify Studio exported chatflow / workflow DSLs through ``graphon.dsl.loads``. What ``main.py`` demonstrates end-to-end: - Static DSL inspection via ``graphon.dsl.inspect`` before execution, printing document kind, plugin dependencies, and load status. Aborts early with a readable diagnostic for non-loadable plans (unsupported node types, config-only ``app.mode`` like ``chat`` / ``completion`` / ``agent-chat``, unresolvable plugin dependencies). - Execution via ``graphon.dsl.loads`` — the canonical 4-line integration surface; everything else in main.py is decoration. - Live event consumption: ``NodeRunStreamChunkEvent`` writes chunks to stdout as they arrive, ``NodeRunStartedEvent`` shows per-node lifecycle, ``NodeRunAgentLogEvent`` exposes agent strategy inner steps, ``GraphRunSucceededEvent`` collects the final ``outputs["answer"]``. - Credential isolation: keys live only in ``credentials.json`` (gitignored via the repo-wide ``examples/*/credentials.json`` rule). No ambient environment variables are consulted for secrets. - Slim binary auto-discovery: ``SLIM_BINARY_PATH`` env var wins, then a local ``./slim`` file in the example directory, otherwise rely on ``PATH``. Includes ``credentials.example.json`` matching the upstream ``examples/slim_llm`` convention so credential plumbing is consistent across demos. Multi-vendor template lists both ``tongyi`` (DashScope) and ``openai`` (with custom ``openai_api_base`` + ``api_protocol`` fields exposed by the ``langgenius/openai`` plugin). README is a standard usage reference for downstream integrators: observed event types, the 4-line integration core, supported / unsupported node types and modes, troubleshooting table. Also extends ``.gitignore`` to cover ``.scratch/`` (local design / review notes) and ``examples/*/credentials.json`` (per-example secrets, never committed). Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 11 +- examples/chatflow_dsl_runner/README.md | 185 ++++++++++++ examples/chatflow_dsl_runner/__init__.py | 0 .../chatflow_dsl_agent.yml | 280 ++++++++++++++++++ .../chatflow_dsl_simple.yml | 165 +++++++++++ .../credentials.example.json | 20 ++ examples/chatflow_dsl_runner/main.py | 243 +++++++++++++++ 7 files changed, 903 insertions(+), 1 deletion(-) create mode 100644 examples/chatflow_dsl_runner/README.md create mode 100644 examples/chatflow_dsl_runner/__init__.py create mode 100644 examples/chatflow_dsl_runner/chatflow_dsl_agent.yml create mode 100644 examples/chatflow_dsl_runner/chatflow_dsl_simple.yml create mode 100644 examples/chatflow_dsl_runner/credentials.example.json create mode 100644 examples/chatflow_dsl_runner/main.py diff --git a/.gitignore b/.gitignore index 090cfd6..f13f371 100644 --- a/.gitignore +++ b/.gitignore @@ -29,9 +29,18 @@ coverage.xml *.cover *.py,cover .hypothesis/ -.pytest_cache/ cover/ # IDE configs .idea/ .vscode/ + +# Claude +.claude/ +claude_cache/ + +# Local scratch / drafts +.scratch/ + +# Local credentials for examples (only example.json is tracked) +examples/*/credentials.json diff --git a/examples/chatflow_dsl_runner/README.md b/examples/chatflow_dsl_runner/README.md new file mode 100644 index 0000000..1062a34 --- /dev/null +++ b/examples/chatflow_dsl_runner/README.md @@ -0,0 +1,185 @@ +# Chatflow DSL Runner + +Canonical pattern for executing a **Dify Studio exported chatflow / workflow DSL** through `graphon.dsl.loads` from an external Python process. + +Unlike the sibling `examples/slim_llm` example — which carries its own simplified `graph.yml` — this runner takes a real Dify export YAML as input, performs static inspection, and streams the run to stdout. + +Two bundled fixtures are provided: + +| Fixture | Shape | Runs out of the box? | +| --- | --- | --- | +| `chatflow_dsl_simple.yml` | `start → llm → answer` | **Yes.** Only requires the `langgenius/openai` plugin and a model key. | +| `chatflow_dsl_agent.yml` | `start → llm → answer` + `llm → agent → answer` | **No** — see [Architecture & limitations](#architecture--limitations). Requires a running Dify control plane. | + +## What this demo shows + +| Capability | Where in the code | +| --- | --- | +| Static DSL inspection before run | `diagnose()` calls `graphon.dsl.inspect` | +| Loading + executing a DSL | `run_workflow()` calls `graphon.dsl.loads` | +| Live streaming of LLM / Answer chunks | `NodeRunStreamChunkEvent` handler | +| Lifecycle visibility (per-node start / fail; graph succeed / fail) | event loop | +| Agent strategy log forwarding | `NodeRunAgentLogEvent` handler | +| Credential isolation (no env vars for secrets) | `credentials.json` | +| Slim binary auto-discovery | `setup_slim_binary()` | + +## Requirements + +- Everything in the root `README.md` (Python 3.12/3.13, `uv`, `make`). +- A built `dify-plugin-daemon-slim` binary — either on `PATH`, pointed at via `SLIM_BINARY_PATH`, or dropped into this directory as `./slim`. Build it from [`langgenius/dify-plugin-daemon`](https://github.com/langgenius/dify-plugin-daemon): + + ```bash + go build -o /path/to/dify-plugin-daemon-slim ./cmd/slim + ``` + +- Plugin packages declared in a DSL's top-level `dependencies:` are auto-downloaded from `marketplace.dify.ai` on first use (cached under the directory specified in `credentials.json::slim.plugin_folder`). + - `chatflow_dsl_simple.yml` declares `langgenius/openai` — auto-downloaded. + - `chatflow_dsl_agent.yml` declares `langgenius/openai` likewise. The `langgenius/agent` strategy plugin is referenced on the agent node itself, and `langgenius/json_process` is referenced inside the agent node's `tools` parameter. See [Architecture & limitations](#architecture--limitations) for how those are expected to be installed. + +## Run + +```bash +cd examples/chatflow_dsl_runner +cp credentials.example.json credentials.json +# edit credentials.json: fill in your openai_api_key (and openai_api_base if you proxy the API) + +# If your slim binary is not on PATH: +export SLIM_BINARY_PATH=/abs/path/to/dify-plugin-daemon-slim +# OR drop a symlink in this directory: +# ln -s /abs/path/to/dify-plugin-daemon-slim ./slim + +# Out-of-the-box: simple start → llm → answer chatflow +python3 main.py chatflow_dsl_simple.yml "Hello, please introduce yourself." + +# Full fixture (see "Architecture & limitations" below — this one needs a running Dify Server + plugin daemon): +python3 main.py chatflow_dsl_agent.yml "Hello, please introduce yourself." + +# Or pass any other Dify chatflow YAML: +python3 main.py /path/to/your-chatflow.yml "Hello, please introduce yourself." +``` + +### Quick reference: the canonical 4-line core + +After credential setup and DSL inspection, the entire DSL-driven run is: + +```python +engine = loads( + dsl_text, + credentials=credentials, + workflow_id="chatflow-dsl-runner", + start_inputs={"query": query}, +) +for event in engine.run(): + ... # consume events +``` + +Everything else in `main.py` is **decoration** — diagnostics, streaming, credential plumbing. The integration surface is exactly those 4 lines. + +## Output shape + +``` +╭─ DSL inspection ──────── +│ kind: app +│ status: loadable +│ deps: 1 plugin(s) +│ - langgenius/openai:0.4.0@ +╰───────────────── + +> Graph run started + > [start] User Input + > [llm] LLM +Hello! I am ... (streamed chunks) + > [answer] Response + +[OK] Graph run succeeded + +── Final answer ───────── +Hello! I am ... +``` + +## Architecture & limitations + +`graphon` is a **pure execution engine** — it knows about nodes, edges, the slim plugin runtime, and how to stream events. It deliberately does **not** know about tenants, workspaces, model-provider catalogues, or quota: those are control-plane concerns owned by Dify Server. + +That separation has a load-bearing consequence for the `agent` node. + +### Why `chatflow_dsl_simple.yml` runs out of the box + +A plain `llm` node calls a model-provider plugin (e.g. `langgenius/openai`) through the slim runtime. The plugin's `invoke_llm` action is a single forward call: stdin → plugin → LLM API → stdout. No callbacks into Dify Server are required, so local slim mode is sufficient: + +``` +graphon ── stdin/stdout NDJSON ──▶ slim binary ──▶ openai plugin ──▶ LLM +``` + +### Why `chatflow_dsl_agent.yml` does **not** run out of the box + +The `langgenius/agent` plugin implements its strategy logic in Python, and inside that logic it does `self.session.model.llm.invoke(...)` to call the LLM. That call is a **nested plugin invocation** — the agent plugin asks the runtime to dispatch another plugin (the model provider) on its behalf. In production Dify this is implemented by: + +``` +agent plugin ──▶ daemon backwards_invocation + ──▶ POST http://dify-api:5001/inner/api/invoke/llm + ──▶ Dify Server validates tenant + provider config + ──▶ Dify Server dispatches openai plugin via daemon + ──▶ response streams back up the chain +``` + +The 5001 inner API is part of Dify Server, and it requires a **tenant context** (provider credentials are scoped per workspace). Standing up that whole control plane just to drive the agent node from a graphon-only runner means: + +1. Run `dify-plugin-daemon` (not just the slim binary) at e.g. `:5002`. +2. Run Dify Server at `:5001` with a configured tenant + OpenAI provider. +3. Install both `langgenius/agent` and `langgenius/openai` into the daemon for that tenant. +4. Some path for backwards-invocation requests originating from the agent plugin to reach Dify Server with a valid tenant context. + +The fourth point is where the architectural collision sits. The natural implementation — teaching the slim runtime to forward `tenant_id` / `user_id` — would leak Dify's business-side identity model into a tenant-agnostic execution engine, which is **explicitly rejected** here. + +The correct long-term fix is described in [RFC #102](https://github.com/langgenius/graphon/issues/102): re-shape the daemon so plugin code can perform nested invocations without depending on Dify Server's inner API. **Until that lands, the agent fixture is included as a structural reference — it is what a real Dify Studio export of a chatflow with an agent looks like — not as something `main.py` can drive end-to-end on its own.** + +## What is and is not supported + +The upstream `graphon.dsl` importer accepts: + +- `kind: app` Dify exports where `app.mode in {workflow, advanced-chat}` +- `kind: graph` simplified DSL (see `examples/slim_llm/graph.yml`) + +Supported **node types** (built into `SlimDslNodeFactory`, including this fork's added `agent` routing): + +`start`, `end`, `answer`, `llm`, `if-else`, `template-transform`, `code`, `tool`, `agent` + +**Not supported by this demo** (the inspector will flag them): + +- `knowledge-retrieval`, `datasource` — RAG path not in scope. +- `app.mode in {chat, completion, agent-chat}` — these are config-only, no executable graph exists. The inspector rejects them. + +## Integrating into your own product + +The pattern in `main.py` is what you want for an SDK-style integration: + +1. Keep `credentials.json` schema simple and explicit — pass it whole to `loads`. No environment-variable spelunking. +2. Hold on to the `GraphEngine` object — you can call `engine.run()` in a thread and bridge events to an SSE stream / websocket / async queue. +3. Use `inspect()` *before* `loads()` to give the operator/admin a readable plan and refuse misconfigurations early. +4. Match each event in the run loop and translate it to your product's surface (chat bubble, log line, billing meter, trace span). + +## Troubleshooting + +| Symptom | Likely cause | +| --- | --- | +| `Missing credentials.json` | You didn't `cp credentials.example.json credentials.json` | +| `SLIM_BINARY_PATH points to a missing file` | Path you set does not exist or is not executable | +| `dify-plugin-daemon-slim is not available in PATH` | Build the slim binary and either add it to PATH or set `SLIM_BINARY_PATH` | +| `Variable #sys.files# not found` | Your DSL's LLM node references `sys.files` but `main.py` only seeds `query`. `main.py` seeds an empty `files` list by default; if your DSL needs more system vars, extend `_DEFAULT_START_INPUTS`. | +| `DSL not loadable. ... Unsupported node types: ...` | The named node is not supported (e.g. `knowledge-retrieval`, `datasource`), or the DSL's `app.mode` is config-only. | +| `tenant not found` / `Provider ... does not exist` / agent node hangs | You're running `chatflow_dsl_agent.yml` without a Dify Server control plane. See [Architecture & limitations](#architecture--limitations) — this fixture is not runnable in local slim mode. | +| Plugin download takes minutes on first run | Marketplace download + `uv` env init for the plugin's Python venv. Subsequent runs hit the cache. | + +## Layout + +``` +chatflow_dsl_runner/ +├── README.md # this file +├── main.py # CLI entrypoint +├── chatflow_dsl_simple.yml # runnable fixture: start → llm → answer +├── chatflow_dsl_agent.yml # reference fixture with agent node (see limitations) +├── credentials.example.json # config template (commit-safe) +├── credentials.json # your real keys (gitignored) +└── __init__.py +``` diff --git a/examples/chatflow_dsl_runner/__init__.py b/examples/chatflow_dsl_runner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/chatflow_dsl_runner/chatflow_dsl_agent.yml b/examples/chatflow_dsl_runner/chatflow_dsl_agent.yml new file mode 100644 index 0000000..aa9e3fd --- /dev/null +++ b/examples/chatflow_dsl_runner/chatflow_dsl_agent.yml @@ -0,0 +1,280 @@ +app: + description: '' + icon: heart_eyes + icon_background: '#FFEAD5' + icon_type: emoji + mode: advanced-chat + name: Chatflow-Agent-Demo + use_icon_as_answer_icon: false +dependencies: +- current_identifier: null + type: marketplace + value: + marketplace_plugin_unique_identifier: langgenius/openai:0.4.0@beafb5a726eda839a1839f61a0456ae7e068c98624c53f59b07be9a71fbf72da + version: null +kind: app +version: 0.6.0 +workflow: + conversation_variables: [] + environment_variables: [] + features: + file_upload: + allowed_file_extensions: + - .JPG + - .JPEG + - .PNG + - .GIF + - .WEBP + - .SVG + allowed_file_types: + - image + allowed_file_upload_methods: + - local_file + - remote_url + enabled: false + fileUploadConfig: + attachment_image_file_size_limit: 2 + audio_file_size_limit: 100 + batch_count_limit: 20 + file_size_limit: 150 + file_upload_limit: 20 + image_file_batch_limit: 10 + image_file_size_limit: 20 + single_chunk_attachment_limit: 10 + video_file_size_limit: 200 + workflow_file_upload_limit: 10 + image: + enabled: false + number_limits: 3 + transfer_methods: + - local_file + - remote_url + number_limits: 3 + opening_statement: '' + retriever_resource: + enabled: true + sensitive_word_avoidance: + enabled: false + speech_to_text: + enabled: false + suggested_questions: [] + suggested_questions_after_answer: + enabled: false + text_to_speech: + enabled: false + language: '' + voice: '' + graph: + edges: + - data: + sourceType: start + targetType: llm + id: 1777208318154-llm + source: '1777208318154' + sourceHandle: source + target: llm + targetHandle: target + type: custom + - data: + sourceType: llm + targetType: answer + id: llm-answer + source: llm + sourceHandle: source + target: answer + targetHandle: target + type: custom + - data: + isInIteration: false + isInLoop: false + sourceType: llm + targetType: agent + id: llm-source-1777208323342-target + source: llm + sourceHandle: source + target: '1777208323342' + targetHandle: target + type: custom + zIndex: 0 + - data: + isInIteration: false + isInLoop: false + sourceType: agent + targetType: answer + id: 1777208323342-source-1777208328174-target + source: '1777208323342' + sourceHandle: source + target: '1777208328174' + targetHandle: target + type: custom + zIndex: 0 + nodes: + - data: + selected: false + title: User Input + type: start + variables: [] + height: 73 + id: '1777208318154' + position: + x: 80 + y: 282 + positionAbsolute: + x: 80 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + context: + enabled: false + variable_selector: [] + memory: + query_prompt_template: '{{#sys.query#}} + + + {{#sys.files#}}' + role_prefix: + assistant: '' + user: '' + window: + enabled: false + size: 10 + model: + completion_params: + temperature: 0.7 + mode: chat + name: gpt-5.5 + provider: langgenius/openai/openai + prompt_template: + - id: de7fac2d-db9a-473e-80c9-7fe5cf614c33 + role: system + text: '' + selected: true + title: LLM + type: llm + vision: + enabled: false + height: 88 + id: llm + position: + x: 380 + y: 282 + positionAbsolute: + x: 380 + y: 282 + selected: true + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + answer: '{{#llm.text#}}' + selected: false + title: Response + type: answer + variables: [] + height: 103 + id: answer + position: + x: 680 + y: 282 + positionAbsolute: + x: 680 + y: 282 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + agent_parameters: + instruction: + type: constant + value: Testing + model: + type: constant + value: + completion_params: {} + mode: chat + model: gpt-5.5 + model_type: llm + provider: langgenius/openai/openai + type: model-selector + query: + type: constant + value: '' + tools: + type: constant + value: + - enabled: true + extra: + description: + parameters: + content: + auto: 1 + value: null + json_filter: + auto: 1 + value: null + provider_name: langgenius/json_process/json_process + provider_show_name: langgenius/json_process/json_process + settings: + ensure_ascii: + value: + type: constant + value: true + tool_description: A tool that parses JSON objects + tool_label: JSON Parse + tool_name: parse + type: builtin + agent_strategy_label: FunctionCalling + agent_strategy_name: function_calling + agent_strategy_provider_name: langgenius/agent/agent + meta: + minimum_dify_version: 1.7.0 + version: 0.0.2 + output_schema: {} + plugin_unique_identifier: langgenius/agent:0.0.37@a5dcc6ea00bca23439b49ff7d65704f3f5dd6ce2ca353205e62278e2148d84b6 + selected: false + title: Agent + tool_node_version: '2' + type: agent + height: 188 + id: '1777208323342' + position: + x: 680 + y: 424 + positionAbsolute: + x: 680 + y: 424 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + answer: Agent + selected: false + title: Response 2 + type: answer + variables: [] + height: 100 + id: '1777208328174' + position: + x: 982 + y: 424 + positionAbsolute: + x: 982 + y: 424 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 242 + viewport: + x: 296 + y: 484 + zoom: 1 + rag_pipeline_variables: [] diff --git a/examples/chatflow_dsl_runner/chatflow_dsl_simple.yml b/examples/chatflow_dsl_runner/chatflow_dsl_simple.yml new file mode 100644 index 0000000..9f7f3ad --- /dev/null +++ b/examples/chatflow_dsl_runner/chatflow_dsl_simple.yml @@ -0,0 +1,165 @@ +app: + description: '' + icon: speech_balloon + icon_background: '#E6F4FF' + icon_type: emoji + mode: advanced-chat + name: Chatflow-Simple-Demo + use_icon_as_answer_icon: false +dependencies: +- current_identifier: null + type: marketplace + value: + marketplace_plugin_unique_identifier: langgenius/openai:0.4.0@beafb5a726eda839a1839f61a0456ae7e068c98624c53f59b07be9a71fbf72da + version: null +kind: app +version: 0.6.0 +workflow: + conversation_variables: [] + environment_variables: [] + features: + file_upload: + allowed_file_extensions: + - .JPG + - .JPEG + - .PNG + - .GIF + - .WEBP + - .SVG + allowed_file_types: + - image + allowed_file_upload_methods: + - local_file + - remote_url + enabled: false + fileUploadConfig: + attachment_image_file_size_limit: 2 + audio_file_size_limit: 100 + batch_count_limit: 20 + file_size_limit: 150 + file_upload_limit: 20 + image_file_batch_limit: 10 + image_file_size_limit: 20 + single_chunk_attachment_limit: 10 + video_file_size_limit: 200 + image: + enabled: false + number_limits: 3 + transfer_methods: + - local_file + - remote_url + number_limits: 3 + opening_statement: '' + retriever_resource: + enabled: true + sensitive_word_avoidance: + enabled: false + speech_to_text: + enabled: false + suggested_questions: [] + suggested_questions_after_answer: + enabled: false + text_to_speech: + enabled: false + language: '' + voice: '' + graph: + edges: + - data: + sourceType: start + targetType: llm + id: start-llm + source: '1777208318154' + sourceHandle: source + target: llm + targetHandle: target + type: custom + - data: + sourceType: llm + targetType: answer + id: llm-answer + source: llm + sourceHandle: source + target: answer + targetHandle: target + type: custom + nodes: + - data: + selected: false + title: User Input + type: start + variables: [] + height: 73 + id: '1777208318154' + position: + x: 80 + y: 282 + positionAbsolute: + x: 80 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + context: + enabled: false + variable_selector: [] + memory: + query_prompt_template: '{{#sys.query#}}' + role_prefix: + assistant: '' + user: '' + window: + enabled: false + size: 10 + model: + completion_params: + temperature: 0.7 + mode: chat + name: gpt-5.5 + provider: langgenius/openai/openai + prompt_template: + - id: de7fac2d-db9a-473e-80c9-7fe5cf614c33 + role: system + text: You are a helpful assistant. Answer concisely. + selected: false + title: LLM + type: llm + vision: + enabled: false + height: 88 + id: llm + position: + x: 380 + y: 282 + positionAbsolute: + x: 380 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + answer: '{{#llm.text#}}' + selected: false + title: Response + type: answer + variables: [] + height: 103 + id: answer + position: + x: 680 + y: 282 + positionAbsolute: + x: 680 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + viewport: + x: 0 + y: 0 + zoom: 1 + rag_pipeline_variables: [] diff --git a/examples/chatflow_dsl_runner/credentials.example.json b/examples/chatflow_dsl_runner/credentials.example.json new file mode 100644 index 0000000..52b5a7d --- /dev/null +++ b/examples/chatflow_dsl_runner/credentials.example.json @@ -0,0 +1,20 @@ +{ + "slim": { + "mode": "local", + "plugin_folder": ".slim/plugins", + "daemon_addr": "", + "daemon_key": "" + }, + "model_credentials": [ + { + "vendor": "openai", + "values": { + "openai_api_key": "", + "openai_api_base": "", + "openai_organization": "", + "api_protocol": "responses" + } + } + ], + "tool_credentials": [] +} diff --git a/examples/chatflow_dsl_runner/main.py b/examples/chatflow_dsl_runner/main.py new file mode 100644 index 0000000..acc6a28 --- /dev/null +++ b/examples/chatflow_dsl_runner/main.py @@ -0,0 +1,243 @@ +"""Run a Dify chatflow DSL through ``graphon.dsl`` with diagnostics and streaming. + +This is the canonical downstream-integrator pattern: feed a Dify Studio +exported workflow / advanced-chat YAML to ``graphon.dsl.loads`` and observe +events. The script is intentionally short — most of the work is delegated to +upstream graphon. + +Run:: + + cd examples/chatflow_dsl_runner + cp credentials.example.json credentials.json + # fill in the keys you need (the bundled fixture uses Alibaba Tongyi) + + python3 main.py /path/to/your-chatflow.yml "Hello, please introduce yourself." + +The script performs three steps: + +1. **Inspect** the DSL with ``graphon.dsl.inspect`` for a static plan + (document kind, plugin dependencies, load status). Aborts early with a + readable diagnostic when the DSL is not loadable (unsupported node type, + missing plugin declaration, etc.). +2. **Load** the DSL with ``graphon.dsl.loads`` into a ``GraphEngine``. +3. **Run** the engine and react to its event stream: + + - ``GraphRunStartedEvent`` / ``GraphRunSucceededEvent`` / ``GraphRunFailedEvent`` + mark the workflow lifecycle. + - ``NodeRunStartedEvent`` / ``NodeRunSucceededEvent`` / ``NodeRunFailedEvent`` + mark each node. + - ``NodeRunStreamChunkEvent`` carries LLM/Answer chunks — written to + stdout as they arrive so the user sees streaming output. + - ``NodeRunAgentLogEvent`` reports Agent strategy inner steps (forwarded + from the slim ``invoke_agent_strategy`` action once an ``AgentNode`` + implementation is in place upstream). + +Credential isolation: keys live only in ``credentials.json`` (gitignored via +the repo-wide ``examples/*/credentials.json`` rule). The script never reads +ambient environment variables for API keys. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path +from typing import Any + +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from graphon.dsl import inspect, loads +from graphon.dsl.entities import LoadStatus +from graphon.graph_events.agent import NodeRunAgentLogEvent +from graphon.graph_events.graph import ( + GraphRunFailedEvent, + GraphRunStartedEvent, + GraphRunSucceededEvent, +) +from graphon.graph_events.node import ( + NodeRunFailedEvent, + NodeRunStartedEvent, + NodeRunStreamChunkEvent, +) + +HERE = Path(__file__).resolve().parent +CREDENTIALS_FILE = HERE / "credentials.json" +CREDENTIALS_EXAMPLE_FILE = HERE / "credentials.example.json" +LOCAL_SLIM_BINARY = HERE / "slim" +DEFAULT_QUERY = "Hello! Please introduce yourself in one short sentence." + +# advanced-chat DSLs commonly reference ``{{#sys.files#}}`` in their LLM +# / agent prompt templates even when no upload happens. The importer maps +# every ``start_inputs`` entry into ``("sys", key)`` selectors, so seeding +# an empty ``files`` list satisfies the template lookup without changing +# the DSL itself. +_DEFAULT_START_INPUTS: dict[str, Any] = {"files": []} + + +def load_credentials(path: Path = CREDENTIALS_FILE) -> dict[str, Any]: + """Read and lightly normalize the credentials JSON. + + Relative paths under ``slim.plugin_folder`` / ``slim.plugin_root`` are + resolved against this script's directory so the file is portable. + + Returns: + The parsed credentials dict ready to pass to ``graphon.dsl.loads``. + + Raises: + FileNotFoundError: If ``credentials.json`` does not exist. + TypeError: If the JSON root is not an object. + """ + if not path.is_file(): + msg = ( + f"Missing {path.name}. Copy {CREDENTIALS_EXAMPLE_FILE.name} " + f"to {path.name} and fill in the required values." + ) + raise FileNotFoundError(msg) + + raw = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + msg = f"{path.name} must contain a JSON object." + raise TypeError(msg) + + credentials: dict[str, Any] = dict(raw) + slim_raw = credentials.get("slim") + if isinstance(slim_raw, dict): + slim_settings: dict[str, Any] = dict(slim_raw) + for key in ("plugin_folder", "plugin_root"): + value = slim_settings.get(key) + if isinstance(value, str) and value: + slim_settings[key] = _resolve_relative_path(value) + credentials["slim"] = slim_settings + return credentials + + +def _resolve_relative_path(value: str) -> str: + path = Path(value).expanduser() + if path.is_absolute(): + return str(path) + return str((HERE / path).resolve()) + + +def setup_slim_binary() -> None: + """Point ``SLIM_BINARY_PATH`` at a local ``slim`` file if present. + + Matches the convention used by ``examples/slim_llm``: drop a copy or + symlink of ``dify-plugin-daemon-slim`` next to ``main.py`` and the + script picks it up. An explicit ``SLIM_BINARY_PATH`` env var still wins. + """ + if os.environ.get("SLIM_BINARY_PATH"): + return + if LOCAL_SLIM_BINARY.is_file(): + os.environ["SLIM_BINARY_PATH"] = str(LOCAL_SLIM_BINARY) + + +def _emit(line: str = "") -> None: + """Write a line to stdout. Centralized so all CLI output looks the same.""" + sys.stdout.write(line + "\n") + sys.stdout.flush() + + +def diagnose(dsl_text: str) -> None: + """Statically inspect a DSL before execution; abort on non-loadable plans.""" + plan = inspect(dsl_text) + + _emit("╭─ DSL inspection ────────") + _emit(f"│ kind: {plan.document.kind.value}") + _emit(f"│ status: {plan.load_status.value}") + _emit(f"│ deps: {len(plan.dependencies)} plugin(s)") + for dep in plan.dependencies: + identifier = dep.plugin_unique_identifier or "(no identifier)" + _emit(f"│ - {identifier}") + if plan.load_reason: + _emit(f"│ reason: {plan.load_reason}") + _emit("╰─────────────────") + + if plan.load_status is not LoadStatus.LOADABLE: + _emit() + _emit("Cannot load this DSL. Common causes:") + _emit(" - Unsupported node types (e.g. 'knowledge-retrieval' /") + _emit(" 'datasource' — the RAG path is not in scope here).") + _emit(" - The DSL app.mode is config-only (chat / completion /") + _emit(" agent-chat) and has no executable graph.") + _emit(" - A declared plugin dependency cannot be resolved.") + sys.exit(1) + + +def _format_node_label(event: NodeRunStartedEvent) -> str: + title = event.node_title or event.node_id + return f"[{event.node_type}] {title}" + + +def run_workflow(dsl_path: Path, query: str) -> str | None: + """Load + run a DSL, streaming events to stdout. Return the answer string.""" + dsl_text = dsl_path.read_text(encoding="utf-8") + diagnose(dsl_text) + + setup_slim_binary() + credentials = load_credentials() + + engine = loads( + dsl_text, + credentials=credentials, + workflow_id="chatflow-dsl-runner", + start_inputs={**_DEFAULT_START_INPUTS, "query": query}, + ) + + _emit() + final_answer: str | None = None + for event in engine.run(): + if isinstance(event, GraphRunStartedEvent): + _emit("> Graph run started") + elif isinstance(event, NodeRunStartedEvent): + _emit(f" > {_format_node_label(event)}") + elif isinstance(event, NodeRunStreamChunkEvent): + sys.stdout.write(event.chunk) + sys.stdout.flush() + elif isinstance(event, NodeRunAgentLogEvent): + _emit(f"\n * agent log [{event.label}] {event.status}") + elif isinstance(event, NodeRunFailedEvent): + _emit(f"\n ! node {event.node_id} failed: {event.error}") + elif isinstance(event, GraphRunSucceededEvent): + answer = event.outputs.get("answer") + if isinstance(answer, str): + final_answer = answer + _emit("\n[OK] Graph run succeeded") + elif isinstance(event, GraphRunFailedEvent): + _emit(f"\n[FAIL] Graph run failed: {event.error}") + sys.exit(2) + + return final_answer + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run a Dify chatflow DSL through graphon.dsl.loads", + ) + parser.add_argument( + "dsl_path", + type=Path, + help="Path to a Dify-exported chatflow / workflow DSL YAML.", + ) + parser.add_argument( + "query", + nargs="?", + default=DEFAULT_QUERY, + help=f"User input passed into the start node (default: {DEFAULT_QUERY!r}).", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + answer = run_workflow(args.dsl_path, args.query) + if answer: + _emit("\n── Final answer ─────────") + _emit(answer) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) From 6f028a5e5d0dab2c9598ddaff8e5b2b207ec688f Mon Sep 17 00:00:00 2001 From: Benjamin Date: Tue, 12 May 2026 10:26:06 +0800 Subject: [PATCH 3/5] chore(examples): bump pinned openai plugin in slim_llm to 0.4.0 The previous pin ``langgenius/openai:0.3.8@`` was rejected by the live marketplace with ``plugin package not found`` (500). The upstream ``examples/slim_llm`` demo therefore did not run end-to-end on a fresh ``.slim/plugins`` cache. Updating both the demo's ``graph.yml`` and ``settings.py`` to the current marketplace head ``0.4.0`` (digest ``beafb5a726eda839a1839f61a0456ae7e068c98624c53f59b07be9a71fbf72da``) restores marketplace download. No behavioral change in the demo itself. Co-Authored-By: Claude Opus 4.7 (1M context) --- examples/slim_llm/graph.yml | 2 +- examples/slim_llm/settings.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/slim_llm/graph.yml b/examples/slim_llm/graph.yml index 46cffe9..9684d5d 100644 --- a/examples/slim_llm/graph.yml +++ b/examples/slim_llm/graph.yml @@ -2,7 +2,7 @@ kind: graph dependencies: - type: marketplace value: - marketplace_plugin_unique_identifier: langgenius/openai:0.3.8@592c8252795b5f75807de2d609a03196ed02596b409f7642b4a07548c7ff57ef + marketplace_plugin_unique_identifier: langgenius/openai:0.4.0@beafb5a726eda839a1839f61a0456ae7e068c98624c53f59b07be9a71fbf72da graph: nodes: - id: start diff --git a/examples/slim_llm/settings.py b/examples/slim_llm/settings.py index a7e7c46..1636d87 100644 --- a/examples/slim_llm/settings.py +++ b/examples/slim_llm/settings.py @@ -14,8 +14,8 @@ LOCAL_SLIM_BINARY = HERE / "slim" OPENAI_PLUGIN_ID = ( - "langgenius/openai:0.3.8@" - "592c8252795b5f75807de2d609a03196ed02596b409f7642b4a07548c7ff57ef" + "langgenius/openai:0.4.0@" + "beafb5a726eda839a1839f61a0456ae7e068c98624c53f59b07be9a71fbf72da" ) OPENAI_PROVIDER = "openai" OPENAI_MODEL = "gpt-4o-mini" From ebdd111874968b7123bef0f540f4d811f4f1924d Mon Sep 17 00:00:00 2001 From: WH-2099 Date: Sat, 23 May 2026 18:24:59 +0800 Subject: [PATCH 4/5] chore: remove local-only ignore rules --- .gitignore | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/.gitignore b/.gitignore index f13f371..f3d294b 100644 --- a/.gitignore +++ b/.gitignore @@ -34,13 +34,3 @@ cover/ # IDE configs .idea/ .vscode/ - -# Claude -.claude/ -claude_cache/ - -# Local scratch / drafts -.scratch/ - -# Local credentials for examples (only example.json is tracked) -examples/*/credentials.json From 2aa958d0b87df944a84be0955a78e63b6d1f5b3c Mon Sep 17 00:00:00 2001 From: WH-2099 Date: Sat, 23 May 2026 18:30:21 +0800 Subject: [PATCH 5/5] chore: ignore example credentials --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index f3d294b..2ed419a 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,6 @@ cover/ # IDE configs .idea/ .vscode/ + +# Local credentials for examples +examples/*/credentials.json