Skip to content

🏗️ 重构提案:将 SoloFlow 改造为通用 Agent 编排框架,原生支持 OpenClaw #3

@SonicBotMan

Description

@SonicBotMan

背景:当前版本的核心问题

经过对最新代码的详细 Review,SoloFlow v1 实现了 FSM + YAML + SQLite 的基本骨架,但有几个结构性问题阻碍了它成为一个通用框架(而不是一个视频创作专用工具):

问题 1:runner.py 硬编码了「视频公司」领域逻辑

# runner.py 第 33 行 — 任务计划 prompt 写死了 agent 名称
"agent": "idea/writer/editor/publisher"

这意味着框架无法直接用于其他场景(电商、客服、代码开发),换个业务就得改源码。

问题 2:Agent 执行层没有「工具调用」抽象

当前 run_task() 只做了纯文本 LLM 调用。OpenClaw 的核心能力是工具调用(Tool Use),但现在的 AgentSkill 只是一个描述字段,没有任何实际的 function call 机制:

# agent_loader.py — skills 只是装饰,没有被执行
@dataclass
class AgentSkill:
    name: str
    description: str = ""  # 这里应该有 handler / function_schema

问题 3:任务间没有「上下文传递」机制

多个 Agent 顺序执行时,后一个 Agent 拿不到前一个的输出作为上下文。Task.context 字段虽然存在,但 runner.py 中创建子任务时从未填充它:

# runner.py 第 93 行 — 没有把前一个任务结果传给下一个
task = self.fsm.create(
    title=t.get("title", ""),
    description=t.get("description", ""),
    agent=t.get("agent", "assistant")
    # context=??? 上游结果没有传递
)

问题 4:无法接入 OpenClaw(缺少 Heartbeat + MCP 协议层)

OpenClaw 的工作模式是:收到一个任务 → 调用 MCP tools → 完成后通过 heartbeat/webhook 回调。当前没有任何 MCP Server 接口,也没有异步任务完成回调机制。

问题 5:web.py 过重,承担了太多业务逻辑

soloflow/web.py(11973 bytes)混合了路由、业务逻辑、HTML 渲染。后续难以扩展。


重构目标

一句话定位:SoloFlow 是一个「任务编排核心」,不绑定任何垂直场景,通过 YAML 驱动业务逻辑,通过 Driver 层对接任意 Agent 运行时(LLM直调 / OpenClaw / Claude Code / HTTP)。

重构后的分层架构:

┌─────────────────────────────────────────────────────────┐
│                   Interface Layer                        │
│  FastAPI Web UI  │  CLI  │  Webhook  │  MCP Server      │
└──────────────────────────┬──────────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────────┐
│                  Orchestration Layer                     │
│     FlowEngine(工作流DAG)+ TaskFSM(状态机)           │
│     ContextBus(任务间上下文传递)                       │
└──────────────────────────┬──────────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────────┐
│                   Agent Driver Layer(NEW)              │
│  LLMDriver  │  OpenClawDriver  │  HTTPDriver  │  MockDriver │
└──────────────────────────┬──────────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────────┐
│                  Config / Memory Layer                   │
│     YAML AgentLoader  +  PreferenceMemory(SQLite)      │
└─────────────────────────────────────────────────────────┘

具体重构方案与代码

1. 新增 soloflow/drivers/ — Agent Driver 抽象层

这是本次重构的核心创新,解决 OpenClaw 接入问题。

# soloflow/drivers/base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from dataclasses import dataclass

@dataclass
class DriverResult:
    content: str
    tool_calls: list = None      # OpenClaw 返回的工具调用记录
    raw: Any = None              # 原始响应,方便调试
    tokens_used: int = 0

class BaseDriver(ABC):
    """
    Agent Driver 抽象基类。
    每种 Driver 对应一种 Agent 运行时。
    通过 YAML 中的 driver: llm/openclaw/http 字段选择。
    """
    @abstractmethod
    async def execute(self, 
                      system_prompt: str, 
                      user_message: str,
                      tools: list = None,
                      config: dict = None) -> DriverResult:
        ...
    
    @abstractmethod
    async def health_check(self) -> bool:
        """检查 Driver 是否可用"""
        ...
# soloflow/drivers/llm_driver.py  — 直接调用 LLM(当前默认行为)
from openai import AsyncOpenAI
from .base import BaseDriver, DriverResult

class LLMDriver(BaseDriver):
    def __init__(self, api_key: str = None, base_url: str = None, **kwargs):
        self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
    
    async def execute(self, system_prompt, user_message, tools=None, config=None) -> DriverResult:
        cfg = config or {}
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message}
        ]
        
        kwargs = dict(
            model=cfg.get("model", "gpt-4o"),
            messages=messages,
            temperature=cfg.get("temperature", 0.7),
            max_tokens=cfg.get("max_tokens", 4096),
        )
        
        # 如果有工具定义,转为 OpenAI function schema
        if tools:
            kwargs["tools"] = self._to_openai_tools(tools)
            kwargs["tool_choice"] = "auto"
        
        resp = await self.client.chat.completions.create(**kwargs)
        msg = resp.choices[0].message
        
        tool_calls = []
        if hasattr(msg, "tool_calls") and msg.tool_calls:
            tool_calls = [{"name": tc.function.name, "args": tc.function.arguments}
                          for tc in msg.tool_calls]
        
        return DriverResult(
            content=msg.content or "",
            tool_calls=tool_calls,
            tokens_used=resp.usage.total_tokens if resp.usage else 0
        )
    
    async def health_check(self) -> bool:
        return True  # 依赖 API Key 有效性
    
    def _to_openai_tools(self, tools: list) -> list:
        return [{
            "type": "function",
            "function": {
                "name": t["name"],
                "description": t.get("description", ""),
                "parameters": t.get("parameters", {"type": "object", "properties": {}})
            }
        } for t in tools]
# soloflow/drivers/openclaw_driver.py  — 对接 OpenClaw(关键新增)
import asyncio
import httpx
import uuid
from .base import BaseDriver, DriverResult

class OpenClawDriver(BaseDriver):
    """
    OpenClaw Driver。
    工作模式:
    1. 向 OpenClaw 实例发送任务
    2. OpenClaw 执行(可能调用 MCP tools、写代码等)
    3. 通过轮询 /status 或 webhook 回调获取结果
    """
    def __init__(self, endpoint: str, api_key: str = None, 
                 poll_interval: int = 5, timeout: int = 300, **kwargs):
        self.endpoint = endpoint.rstrip("/")
        self.api_key = api_key
        self.poll_interval = poll_interval
        self.timeout = timeout
    
    async def execute(self, system_prompt, user_message, tools=None, config=None) -> DriverResult:
        task_id = str(uuid.uuid4())
        headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
        
        payload = {
            "task_id": task_id,
            "system_prompt": system_prompt,
            "message": user_message,
            "tools": tools or [],
        }
        
        async with httpx.AsyncClient(timeout=30) as client:
            # 1. 提交任务
            resp = await client.post(
                f"{self.endpoint}/tasks",
                json=payload,
                headers=headers
            )
            resp.raise_for_status()
            job = resp.json()
            remote_task_id = job.get("id", task_id)
            
            # 2. 轮询等待完成
            elapsed = 0
            while elapsed < self.timeout:
                await asyncio.sleep(self.poll_interval)
                elapsed += self.poll_interval
                
                status_resp = await client.get(
                    f"{self.endpoint}/tasks/{remote_task_id}",
                    headers=headers
                )
                status = status_resp.json()
                
                if status.get("status") in ("done", "completed"):
                    return DriverResult(
                        content=status.get("result", ""),
                        tool_calls=status.get("tool_calls", []),
                        tokens_used=status.get("tokens_used", 0)
                    )
                elif status.get("status") in ("failed", "error"):
                    raise RuntimeError(f"OpenClaw task failed: {status.get('error')}")
            
            raise TimeoutError(f"OpenClaw task {remote_task_id} timed out after {self.timeout}s")
    
    async def health_check(self) -> bool:
        try:
            async with httpx.AsyncClient(timeout=5) as client:
                r = await client.get(f"{self.endpoint}/health")
                return r.status_code == 200
        except Exception:
            return False
# soloflow/drivers/__init__.py  — Driver 注册表,通过字符串名称选择
from .llm_driver import LLMDriver
from .openclaw_driver import OpenClawDriver
from .base import BaseDriver

DRIVER_REGISTRY = {
    "llm": LLMDriver,
    "openclaw": OpenClawDriver,
}

def create_driver(driver_type: str, **kwargs) -> BaseDriver:
    cls = DRIVER_REGISTRY.get(driver_type)
    if not cls:
        raise ValueError(f"Unknown driver: {driver_type}. Available: {list(DRIVER_REGISTRY)}")
    return cls(**kwargs)

2. 修改 YAML 格式:增加 driver 字段

# soloflow/agents/assistant.yaml(更新后)
name: assistant
alias: 小助
role: 老板助理,唯一对话入口
driver: llm          # ← 新增:选择运行时,默认 llm
model: gpt-4o
temperature: 0.3
can_delegate: true

system_prompt: |
  你是「小助」,老板的专属AI助理...
  
skills: []           # LLM driver 下 skills 作为描述;openclaw driver 下会实际执行

---
# soloflow/agents/coder.yaml(新增:代码开发场景示例)
name: coder
alias: 小码
role: 全栈工程师,负责代码实现
driver: openclaw     # ← 使用 OpenClaw 实际执行代码
driver_config:
  endpoint: ${OPENCLAW_ENDPOINT}   # 从环境变量读取
  timeout: 600
model: claude-3-5-sonnet           # OpenClaw 内部使用的模型

skills:
  - name: write_file
    description: 写入文件
    parameters:
      type: object
      properties:
        path: {type: string}
        content: {type: string}
  - name: run_command
    description: 执行终端命令
    parameters:
      type: object
      properties:
        cmd: {type: string}

3. 新增 ContextBus:任务间上下文传递

# soloflow/context_bus.py  — 解决任务间数据传递问题
from typing import Any, Dict, Optional
import json, sqlite3, time

class ContextBus:
    """
    任务间上下文总线。
    每个任务完成后,可以 publish 键值对;
    下游任务可以 subscribe 特定 key 获取数据。
    全部存 SQLite,支持跨进程。
    """
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS context_bus (
                flow_id TEXT NOT NULL,
                key TEXT NOT NULL,
                value TEXT,
                task_id TEXT,
                created_at REAL,
                PRIMARY KEY (flow_id, key)
            )
        """)
        self.conn.commit()
    
    def publish(self, flow_id: str, key: str, value: Any, task_id: str = None):
        """任务完成后发布结果到上下文"""
        self.conn.execute(
            "INSERT OR REPLACE INTO context_bus VALUES (?,?,?,?,?)",
            (flow_id, key, json.dumps(value), task_id, time.time())
        )
        self.conn.commit()
    
    def get(self, flow_id: str, key: str, default=None) -> Any:
        """获取上下文数据"""
        row = self.conn.execute(
            "SELECT value FROM context_bus WHERE flow_id=? AND key=?",
            (flow_id, key)
        ).fetchone()
        return json.loads(row[0]) if row else default
    
    def get_all(self, flow_id: str) -> Dict[str, Any]:
        """获取 flow 的全部上下文"""
        rows = self.conn.execute(
            "SELECT key, value FROM context_bus WHERE flow_id=?",
            (flow_id,)
        ).fetchall()
        return {r[0]: json.loads(r[1]) for r in rows}
    
    def build_context_prompt(self, flow_id: str) -> str:
        """格式化为可注入 prompt 的文本"""
        ctx = self.get_all(flow_id)
        if not ctx:
            return ""
        lines = ["\n【上游任务结果 - 请参考使用】"]
        for k, v in ctx.items():
            preview = str(v)[:300] + "..." if len(str(v)) > 300 else str(v)
            lines.append(f"- {k}: {preview}")
        return "\n".join(lines)

4. 重构 runner.py:去掉领域硬编码,改为通用 FlowEngine

# soloflow/runner.py(完整重写)
import asyncio, json, os
from typing import Dict, List, Optional
from .fsm import TaskFSM, TaskStatus
from .agent_loader import AgentLoader
from .memory import PreferenceMemory
from .context_bus import ContextBus
from .drivers import create_driver, BaseDriver

class FlowEngine:
    """
    通用 Flow 执行引擎。
    
    核心改动:
    1. 不再硬编码任何领域知识
    2. 通过 ContextBus 实现任务间数据传递
    3. 通过 Driver 层支持任意 Agent 运行时
    4. 支持 flow_id 追踪一次完整对话的所有任务
    """
    
    def __init__(self, db_path: str = "data/soloflow.db",
                 agents_dir: str = "soloflow/agents",
                 env: dict = None):
        self.fsm = TaskFSM(db_path)
        self.loader = AgentLoader(agents_dir)
        self.memory = PreferenceMemory(db_path)
        self.context_bus = ContextBus(db_path)
        self.env = env or dict(os.environ)  # 用于替换 YAML 中的 ${VAR}
        self._drivers: Dict[str, BaseDriver] = {}  # driver 实例缓存
    
    def _get_driver(self, agent_name: str) -> BaseDriver:
        """获取或创建 Agent 对应的 Driver(懒加载 + 缓存)"""
        if agent_name in self._drivers:
            return self._drivers[agent_name]
        
        cfg = self.loader.get(agent_name)
        driver_type = cfg.driver  # 从 YAML 读取
        driver_cfg = cfg.driver_config or {}
        
        # 替换环境变量占位符
        resolved = {k: self.env.get(v.strip("${}"), v) 
                    if isinstance(v, str) and v.startswith("${") else v
                    for k, v in driver_cfg.items()}
        
        driver = create_driver(driver_type, **resolved)
        self._drivers[agent_name] = driver
        return driver
    
    async def dispatch(self, user_input: str, flow_id: str = None) -> dict:
        """
        主入口:纯通用逻辑,不含视频/创作领域知识。
        
        Returns dict:
          flow_id, understanding, tasks: [{id, agent, alias, result}], summary
        """
        import uuid
        flow_id = flow_id or str(uuid.uuid4())[:8]
        
        # Step 1: 让 assistant 做通用的任务拆解
        assistant_cfg = self.loader.get("assistant")
        pref_ctx = self.memory.format_for_prompt("assistant")
        
        available_agents = [
            {"name": cfg.name, "alias": cfg.alias, "role": cfg.role}
            for cfg in self.loader.all().values()
            if cfg.name != "assistant"
        ]
        
        # 动态构建 agent 列表,不硬编码
        agent_list_str = "\n".join(
            f"- {a['name']}{a['alias']}): {a['role']}"
            for a in available_agents
        )
        
        plan_prompt = f"""你是{assistant_cfg.alias},老板的专属助理。

当前可用的AI员工:
{agent_list_str}

老板偏好记忆:
{pref_ctx}

请分析老板需求,拆解为子任务并分配给合适的员工。
以 JSON 返回:
{{
  "understanding": "理解老板的需求",
  "need_confirm": false,
  "tasks": [
    {{"agent": "员工name", "title": "任务标题", "description": "详细说明",
      "publish_as": "result_key"}}  // 可选:将结果存入 ContextBus 供下游使用
  ]
}}"""
        
        driver = self._get_driver("assistant")
        plan_result = await driver.execute(
            system_prompt=plan_prompt,
            user_message=user_input,
            config={"model": assistant_cfg.model, "temperature": 0.3,
                    "response_format": {"type": "json_object"}}
        )
        
        try:
            plan = json.loads(plan_result.content)
        except Exception:
            return {"flow_id": flow_id, "error": "规划失败", "raw": plan_result.content}
        
        if plan.get("need_confirm"):
            return {"flow_id": flow_id, "need_confirm": True,
                    "understanding": plan.get("understanding")}
        
        # Step 2: 顺序执行任务(后续可改为 DAG 并行)
        task_results = []
        for t in plan.get("tasks", []):
            task = self.fsm.create(
                title=t["title"],
                description=t["description"],
                agent=t["agent"],
                context={"flow_id": flow_id}
            )
            
            result = await self.run_task(task.id, flow_id)
            
            # 发布到 ContextBus,供下游任务使用
            publish_key = t.get("publish_as") or t["agent"]
            self.context_bus.publish(flow_id, publish_key, result, task.id)
            
            alias = self.loader.get(t["agent"]).alias
            task_results.append({"id": task.id, "agent": t["agent"],
                                  "alias": alias, "result": result})
        
        return {
            "flow_id": flow_id,
            "understanding": plan.get("understanding"),
            "tasks": task_results
        }
    
    async def run_task(self, task_id: str, flow_id: str = None) -> str:
        """执行单个任务,自动注入 ContextBus 上下文"""
        task = self.fsm.get(task_id)
        agent_cfg = self.loader.get(task.agent)
        flow_id = flow_id or task.context.get("flow_id", "")
        
        # 构建 system prompt = 角色 prompt + 偏好记忆 + 上游结果
        pref_ctx = self.memory.format_for_prompt(task.agent)
        upstream_ctx = self.context_bus.build_context_prompt(flow_id) if flow_id else ""
        
        system_prompt = agent_cfg.system_prompt
        if pref_ctx and pref_ctx != "暂无偏好记录":
            system_prompt += f"\n\n【老板偏好】\n{pref_ctx}"
        if upstream_ctx:
            system_prompt += upstream_ctx
        
        # 转换 skills 为 tools
        tools = [
            {"name": s.name, "description": s.description,
             "parameters": getattr(s, "parameters", {})}
            for s in agent_cfg.skills
        ] if agent_cfg.skills else None
        
        self.fsm.transition(task_id, TaskStatus.RUNNING)
        
        try:
            driver = self._get_driver(task.agent)
            dr = await driver.execute(
                system_prompt=system_prompt,
                user_message=f"{task.title}\n\n{task.description}",
                tools=tools,
                config={"model": agent_cfg.model,
                        "temperature": agent_cfg.temperature,
                        "max_tokens": agent_cfg.max_tokens}
            )
            
            result = dr.content
            if "[需要确认]" in result or "[WAIT_HUMAN]" in result:
                self.fsm.transition(task_id, TaskStatus.WAITING_HUMAN, result)
            else:
                self.fsm.transition(task_id, TaskStatus.DONE, result)
            
            return result
            
        except Exception as e:
            self.fsm.transition(task_id, TaskStatus.FAILED, str(e))
            raise

5. 新增 MCP Server 接口(接入 OpenClaw 的关键)

# soloflow/mcp_server.py  — 让 OpenClaw 可以调用 SoloFlow
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import time

# MCP 标准:SoloFlow 暴露的工具列表
SOLOFLOW_TOOLS = [
    {
        "name": "create_task",
        "description": "创建并执行一个 SoloFlow 任务,由 AI 员工完成",
        "inputSchema": {
            "type": "object",
            "properties": {
                "agent": {"type": "string", "description": "目标员工名称"},
                "title": {"type": "string"},
                "description": {"type": "string"},
                "flow_id": {"type": "string", "description": "可选,关联到已有对话流"}
            },
            "required": ["agent", "title"]
        }
    },
    {
        "name": "get_task_result",
        "description": "获取任务执行结果",
        "inputSchema": {
            "type": "object",
            "properties": {"task_id": {"type": "string"}},
            "required": ["task_id"]
        }
    },
    {
        "name": "list_agents",
        "description": "列出所有可用的 AI 员工",
        "inputSchema": {"type": "object", "properties": {}}
    }
]

def create_mcp_router(engine):  # engine: FlowEngine
    from fastapi import APIRouter
    router = APIRouter(prefix="/mcp")
    
    @router.get("/tools")
    def list_tools():
        """MCP 标准接口:返回工具列表"""
        return {"tools": SOLOFLOW_TOOLS}
    
    @router.post("/tools/call")
    async def call_tool(body: dict):
        """MCP 标准接口:执行工具调用"""
        tool_name = body.get("name")
        args = body.get("arguments", {})
        
        if tool_name == "create_task":
            task = engine.fsm.create(
                title=args["title"],
                description=args.get("description", ""),
                agent=args["agent"],
                context={"flow_id": args.get("flow_id", "")}
            )
            result = await engine.run_task(task.id, args.get("flow_id"))
            return {"content": [{"type": "text", "text": result}]}
        
        elif tool_name == "get_task_result":
            task = engine.fsm.get(args["task_id"])
            return {"content": [{"type": "text", "text": task.result or "任务未完成"}]}
        
        elif tool_name == "list_agents":
            agents = engine.loader.list_agents()
            return {"content": [{"type": "text", "text": str(agents)}]}
        
        raise HTTPException(400, f"Unknown tool: {tool_name}")
    
    return router

重构后的文件结构

soloflow/
├── __init__.py
├── fsm.py              # ✅ 保留(已完善)
├── agent_loader.py     # 🔧 新增 driver/driver_config 字段解析
├── memory.py           # ✅ 保留
├── context_bus.py      # 🆕 新增:任务间上下文传递
├── runner.py           # 🔄 重构为通用 FlowEngine
├── mcp_server.py       # 🆕 新增:MCP 协议接口
├── web.py              # 🔧 瘦身,只做路由,业务移到 runner
├── webhook.py          # ✅ 保留
└── drivers/            # 🆕 新增目录
    ├── __init__.py
    ├── base.py
    ├── llm_driver.py
    └── openclaw_driver.py

agents/                 # YAML 配置(与领域解耦)
├── base.yaml           # 公共配置
├── assistant.yaml      # driver: llm
├── idea.yaml           # driver: llm  
├── writer.yaml         # driver: llm
├── editor.yaml         # driver: llm
├── publisher.yaml      # driver: llm
└── coder.yaml          # 🆕 示例:driver: openclaw

examples/               # 🆕 场景示例(展示通用性)
├── video_company/      # 视频创作(原有场景)
│   └── agents/
├── dev_team/           # 代码开发团队
│   └── agents/
└── ecommerce/          # 电商运营团队
    └── agents/

接入 OpenClaw 的完整配置示例

# agents/coder.yaml(放入任意 agents/ 目录即生效)
name: coder
alias: 小码
role: 全栈工程师,负责代码实现和调试
driver: openclaw
driver_config:
  endpoint: ${OPENCLAW_ENDPOINT}   # http://localhost:3100
  timeout: 600
  api_key: ${OPENCLAW_API_KEY}
model: claude-3-5-sonnet-20241022
temperature: 0.2
can_delegate: false

system_prompt: |
  你是「小码」,一名专业的全栈工程师。
  接到任务后,你会:
  1. 分析需求,制定实现方案
  2. 使用工具写代码、运行测试
  3. 完成后汇报结果
  
skills:
  - name: write_file
    description: 写入代码文件
    parameters:
      type: object
      properties:
        path: {type: string, description: 文件路径}
        content: {type: string, description: 文件内容}
      required: [path, content]
  - name: run_command
    description: 执行 shell 命令
    parameters:
      type: object
      properties:
        cmd: {type: string}
      required: [cmd]

然后 .env 加一行:

OPENCLAW_ENDPOINT=http://localhost:3100

docker compose up 后,小码就会通过 OpenClaw 真正执行代码任务。


实施计划

阶段 内容 优先级
P0 新增 drivers/ 目录(base + llm + openclaw)
P0 新增 context_bus.py
P0 重构 runner.pyFlowEngine
P1 agent_loader.py 增加 driver/driver_config 字段
P1 新增 mcp_server.py
P2 新增 examples/ 目录(dev_team / ecommerce)
P2 web.py 瘦身

如果维护者认同方向,我可以提交完整 PR 实现上述所有改动。🚀

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions