背景:当前版本的核心问题
经过对最新代码的详细 Review,SoloFlow v1 实现了 FSM + YAML + SQLite 的基本骨架,但有几个结构性问题阻碍了它成为一个通用框架(而不是一个视频创作专用工具):
问题 1:runner.py 硬编码了「视频公司」领域逻辑
# runner.py 第 33 行 — 任务计划 prompt 写死了 agent 名称
"agent": "idea/writer/editor/publisher"
这意味着框架无法直接用于其他场景(电商、客服、代码开发),换个业务就得改源码。
问题 2:Agent 执行层没有「工具调用」抽象
当前 run_task() 只做了纯文本 LLM 调用。OpenClaw 的核心能力是工具调用(Tool Use),但现在的 AgentSkill 只是一个描述字段,没有任何实际的 function call 机制:
# agent_loader.py — skills 只是装饰,没有被执行
@dataclass
class AgentSkill:
name: str
description: str = "" # 这里应该有 handler / function_schema
问题 3:任务间没有「上下文传递」机制
多个 Agent 顺序执行时,后一个 Agent 拿不到前一个的输出作为上下文。Task.context 字段虽然存在,但 runner.py 中创建子任务时从未填充它:
# runner.py 第 93 行 — 没有把前一个任务结果传给下一个
task = self.fsm.create(
title=t.get("title", ""),
description=t.get("description", ""),
agent=t.get("agent", "assistant")
# context=??? 上游结果没有传递
)
问题 4:无法接入 OpenClaw(缺少 Heartbeat + MCP 协议层)
OpenClaw 的工作模式是:收到一个任务 → 调用 MCP tools → 完成后通过 heartbeat/webhook 回调。当前没有任何 MCP Server 接口,也没有异步任务完成回调机制。
问题 5:web.py 过重,承担了太多业务逻辑
soloflow/web.py(11973 bytes)混合了路由、业务逻辑、HTML 渲染。后续难以扩展。
重构目标
一句话定位:SoloFlow 是一个「任务编排核心」,不绑定任何垂直场景,通过 YAML 驱动业务逻辑,通过 Driver 层对接任意 Agent 运行时(LLM直调 / OpenClaw / Claude Code / HTTP)。
重构后的分层架构:
┌─────────────────────────────────────────────────────────┐
│ Interface Layer │
│ FastAPI Web UI │ CLI │ Webhook │ MCP Server │
└──────────────────────────┬──────────────────────────────┘
│
┌──────────────────────────▼──────────────────────────────┐
│ Orchestration Layer │
│ FlowEngine(工作流DAG)+ TaskFSM(状态机) │
│ ContextBus(任务间上下文传递) │
└──────────────────────────┬──────────────────────────────┘
│
┌──────────────────────────▼──────────────────────────────┐
│ Agent Driver Layer(NEW) │
│ LLMDriver │ OpenClawDriver │ HTTPDriver │ MockDriver │
└──────────────────────────┬──────────────────────────────┘
│
┌──────────────────────────▼──────────────────────────────┐
│ Config / Memory Layer │
│ YAML AgentLoader + PreferenceMemory(SQLite) │
└─────────────────────────────────────────────────────────┘
具体重构方案与代码
1. 新增 soloflow/drivers/ — Agent Driver 抽象层
这是本次重构的核心创新,解决 OpenClaw 接入问题。
# soloflow/drivers/base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from dataclasses import dataclass
@dataclass
class DriverResult:
content: str
tool_calls: list = None # OpenClaw 返回的工具调用记录
raw: Any = None # 原始响应,方便调试
tokens_used: int = 0
class BaseDriver(ABC):
"""
Agent Driver 抽象基类。
每种 Driver 对应一种 Agent 运行时。
通过 YAML 中的 driver: llm/openclaw/http 字段选择。
"""
@abstractmethod
async def execute(self,
system_prompt: str,
user_message: str,
tools: list = None,
config: dict = None) -> DriverResult:
...
@abstractmethod
async def health_check(self) -> bool:
"""检查 Driver 是否可用"""
...
# soloflow/drivers/llm_driver.py — 直接调用 LLM(当前默认行为)
from openai import AsyncOpenAI
from .base import BaseDriver, DriverResult
class LLMDriver(BaseDriver):
def __init__(self, api_key: str = None, base_url: str = None, **kwargs):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
async def execute(self, system_prompt, user_message, tools=None, config=None) -> DriverResult:
cfg = config or {}
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
]
kwargs = dict(
model=cfg.get("model", "gpt-4o"),
messages=messages,
temperature=cfg.get("temperature", 0.7),
max_tokens=cfg.get("max_tokens", 4096),
)
# 如果有工具定义,转为 OpenAI function schema
if tools:
kwargs["tools"] = self._to_openai_tools(tools)
kwargs["tool_choice"] = "auto"
resp = await self.client.chat.completions.create(**kwargs)
msg = resp.choices[0].message
tool_calls = []
if hasattr(msg, "tool_calls") and msg.tool_calls:
tool_calls = [{"name": tc.function.name, "args": tc.function.arguments}
for tc in msg.tool_calls]
return DriverResult(
content=msg.content or "",
tool_calls=tool_calls,
tokens_used=resp.usage.total_tokens if resp.usage else 0
)
async def health_check(self) -> bool:
return True # 依赖 API Key 有效性
def _to_openai_tools(self, tools: list) -> list:
return [{
"type": "function",
"function": {
"name": t["name"],
"description": t.get("description", ""),
"parameters": t.get("parameters", {"type": "object", "properties": {}})
}
} for t in tools]
# soloflow/drivers/openclaw_driver.py — 对接 OpenClaw(关键新增)
import asyncio
import httpx
import uuid
from .base import BaseDriver, DriverResult
class OpenClawDriver(BaseDriver):
"""
OpenClaw Driver。
工作模式:
1. 向 OpenClaw 实例发送任务
2. OpenClaw 执行(可能调用 MCP tools、写代码等)
3. 通过轮询 /status 或 webhook 回调获取结果
"""
def __init__(self, endpoint: str, api_key: str = None,
poll_interval: int = 5, timeout: int = 300, **kwargs):
self.endpoint = endpoint.rstrip("/")
self.api_key = api_key
self.poll_interval = poll_interval
self.timeout = timeout
async def execute(self, system_prompt, user_message, tools=None, config=None) -> DriverResult:
task_id = str(uuid.uuid4())
headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
payload = {
"task_id": task_id,
"system_prompt": system_prompt,
"message": user_message,
"tools": tools or [],
}
async with httpx.AsyncClient(timeout=30) as client:
# 1. 提交任务
resp = await client.post(
f"{self.endpoint}/tasks",
json=payload,
headers=headers
)
resp.raise_for_status()
job = resp.json()
remote_task_id = job.get("id", task_id)
# 2. 轮询等待完成
elapsed = 0
while elapsed < self.timeout:
await asyncio.sleep(self.poll_interval)
elapsed += self.poll_interval
status_resp = await client.get(
f"{self.endpoint}/tasks/{remote_task_id}",
headers=headers
)
status = status_resp.json()
if status.get("status") in ("done", "completed"):
return DriverResult(
content=status.get("result", ""),
tool_calls=status.get("tool_calls", []),
tokens_used=status.get("tokens_used", 0)
)
elif status.get("status") in ("failed", "error"):
raise RuntimeError(f"OpenClaw task failed: {status.get('error')}")
raise TimeoutError(f"OpenClaw task {remote_task_id} timed out after {self.timeout}s")
async def health_check(self) -> bool:
try:
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(f"{self.endpoint}/health")
return r.status_code == 200
except Exception:
return False
# soloflow/drivers/__init__.py — Driver 注册表,通过字符串名称选择
from .llm_driver import LLMDriver
from .openclaw_driver import OpenClawDriver
from .base import BaseDriver
DRIVER_REGISTRY = {
"llm": LLMDriver,
"openclaw": OpenClawDriver,
}
def create_driver(driver_type: str, **kwargs) -> BaseDriver:
cls = DRIVER_REGISTRY.get(driver_type)
if not cls:
raise ValueError(f"Unknown driver: {driver_type}. Available: {list(DRIVER_REGISTRY)}")
return cls(**kwargs)
2. 修改 YAML 格式:增加 driver 字段
# soloflow/agents/assistant.yaml(更新后)
name: assistant
alias: 小助
role: 老板助理,唯一对话入口
driver: llm # ← 新增:选择运行时,默认 llm
model: gpt-4o
temperature: 0.3
can_delegate: true
system_prompt: |
你是「小助」,老板的专属AI助理...
skills: [] # LLM driver 下 skills 作为描述;openclaw driver 下会实际执行
---
# soloflow/agents/coder.yaml(新增:代码开发场景示例)
name: coder
alias: 小码
role: 全栈工程师,负责代码实现
driver: openclaw # ← 使用 OpenClaw 实际执行代码
driver_config:
endpoint: ${OPENCLAW_ENDPOINT} # 从环境变量读取
timeout: 600
model: claude-3-5-sonnet # OpenClaw 内部使用的模型
skills:
- name: write_file
description: 写入文件
parameters:
type: object
properties:
path: {type: string}
content: {type: string}
- name: run_command
description: 执行终端命令
parameters:
type: object
properties:
cmd: {type: string}
3. 新增 ContextBus:任务间上下文传递
# soloflow/context_bus.py — 解决任务间数据传递问题
from typing import Any, Dict, Optional
import json, sqlite3, time
class ContextBus:
"""
任务间上下文总线。
每个任务完成后,可以 publish 键值对;
下游任务可以 subscribe 特定 key 获取数据。
全部存 SQLite,支持跨进程。
"""
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS context_bus (
flow_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT,
task_id TEXT,
created_at REAL,
PRIMARY KEY (flow_id, key)
)
""")
self.conn.commit()
def publish(self, flow_id: str, key: str, value: Any, task_id: str = None):
"""任务完成后发布结果到上下文"""
self.conn.execute(
"INSERT OR REPLACE INTO context_bus VALUES (?,?,?,?,?)",
(flow_id, key, json.dumps(value), task_id, time.time())
)
self.conn.commit()
def get(self, flow_id: str, key: str, default=None) -> Any:
"""获取上下文数据"""
row = self.conn.execute(
"SELECT value FROM context_bus WHERE flow_id=? AND key=?",
(flow_id, key)
).fetchone()
return json.loads(row[0]) if row else default
def get_all(self, flow_id: str) -> Dict[str, Any]:
"""获取 flow 的全部上下文"""
rows = self.conn.execute(
"SELECT key, value FROM context_bus WHERE flow_id=?",
(flow_id,)
).fetchall()
return {r[0]: json.loads(r[1]) for r in rows}
def build_context_prompt(self, flow_id: str) -> str:
"""格式化为可注入 prompt 的文本"""
ctx = self.get_all(flow_id)
if not ctx:
return ""
lines = ["\n【上游任务结果 - 请参考使用】"]
for k, v in ctx.items():
preview = str(v)[:300] + "..." if len(str(v)) > 300 else str(v)
lines.append(f"- {k}: {preview}")
return "\n".join(lines)
4. 重构 runner.py:去掉领域硬编码,改为通用 FlowEngine
# soloflow/runner.py(完整重写)
import asyncio, json, os
from typing import Dict, List, Optional
from .fsm import TaskFSM, TaskStatus
from .agent_loader import AgentLoader
from .memory import PreferenceMemory
from .context_bus import ContextBus
from .drivers import create_driver, BaseDriver
class FlowEngine:
"""
通用 Flow 执行引擎。
核心改动:
1. 不再硬编码任何领域知识
2. 通过 ContextBus 实现任务间数据传递
3. 通过 Driver 层支持任意 Agent 运行时
4. 支持 flow_id 追踪一次完整对话的所有任务
"""
def __init__(self, db_path: str = "data/soloflow.db",
agents_dir: str = "soloflow/agents",
env: dict = None):
self.fsm = TaskFSM(db_path)
self.loader = AgentLoader(agents_dir)
self.memory = PreferenceMemory(db_path)
self.context_bus = ContextBus(db_path)
self.env = env or dict(os.environ) # 用于替换 YAML 中的 ${VAR}
self._drivers: Dict[str, BaseDriver] = {} # driver 实例缓存
def _get_driver(self, agent_name: str) -> BaseDriver:
"""获取或创建 Agent 对应的 Driver(懒加载 + 缓存)"""
if agent_name in self._drivers:
return self._drivers[agent_name]
cfg = self.loader.get(agent_name)
driver_type = cfg.driver # 从 YAML 读取
driver_cfg = cfg.driver_config or {}
# 替换环境变量占位符
resolved = {k: self.env.get(v.strip("${}"), v)
if isinstance(v, str) and v.startswith("${") else v
for k, v in driver_cfg.items()}
driver = create_driver(driver_type, **resolved)
self._drivers[agent_name] = driver
return driver
async def dispatch(self, user_input: str, flow_id: str = None) -> dict:
"""
主入口:纯通用逻辑,不含视频/创作领域知识。
Returns dict:
flow_id, understanding, tasks: [{id, agent, alias, result}], summary
"""
import uuid
flow_id = flow_id or str(uuid.uuid4())[:8]
# Step 1: 让 assistant 做通用的任务拆解
assistant_cfg = self.loader.get("assistant")
pref_ctx = self.memory.format_for_prompt("assistant")
available_agents = [
{"name": cfg.name, "alias": cfg.alias, "role": cfg.role}
for cfg in self.loader.all().values()
if cfg.name != "assistant"
]
# 动态构建 agent 列表,不硬编码
agent_list_str = "\n".join(
f"- {a['name']}({a['alias']}): {a['role']}"
for a in available_agents
)
plan_prompt = f"""你是{assistant_cfg.alias},老板的专属助理。
当前可用的AI员工:
{agent_list_str}
老板偏好记忆:
{pref_ctx}
请分析老板需求,拆解为子任务并分配给合适的员工。
以 JSON 返回:
{{
"understanding": "理解老板的需求",
"need_confirm": false,
"tasks": [
{{"agent": "员工name", "title": "任务标题", "description": "详细说明",
"publish_as": "result_key"}} // 可选:将结果存入 ContextBus 供下游使用
]
}}"""
driver = self._get_driver("assistant")
plan_result = await driver.execute(
system_prompt=plan_prompt,
user_message=user_input,
config={"model": assistant_cfg.model, "temperature": 0.3,
"response_format": {"type": "json_object"}}
)
try:
plan = json.loads(plan_result.content)
except Exception:
return {"flow_id": flow_id, "error": "规划失败", "raw": plan_result.content}
if plan.get("need_confirm"):
return {"flow_id": flow_id, "need_confirm": True,
"understanding": plan.get("understanding")}
# Step 2: 顺序执行任务(后续可改为 DAG 并行)
task_results = []
for t in plan.get("tasks", []):
task = self.fsm.create(
title=t["title"],
description=t["description"],
agent=t["agent"],
context={"flow_id": flow_id}
)
result = await self.run_task(task.id, flow_id)
# 发布到 ContextBus,供下游任务使用
publish_key = t.get("publish_as") or t["agent"]
self.context_bus.publish(flow_id, publish_key, result, task.id)
alias = self.loader.get(t["agent"]).alias
task_results.append({"id": task.id, "agent": t["agent"],
"alias": alias, "result": result})
return {
"flow_id": flow_id,
"understanding": plan.get("understanding"),
"tasks": task_results
}
async def run_task(self, task_id: str, flow_id: str = None) -> str:
"""执行单个任务,自动注入 ContextBus 上下文"""
task = self.fsm.get(task_id)
agent_cfg = self.loader.get(task.agent)
flow_id = flow_id or task.context.get("flow_id", "")
# 构建 system prompt = 角色 prompt + 偏好记忆 + 上游结果
pref_ctx = self.memory.format_for_prompt(task.agent)
upstream_ctx = self.context_bus.build_context_prompt(flow_id) if flow_id else ""
system_prompt = agent_cfg.system_prompt
if pref_ctx and pref_ctx != "暂无偏好记录":
system_prompt += f"\n\n【老板偏好】\n{pref_ctx}"
if upstream_ctx:
system_prompt += upstream_ctx
# 转换 skills 为 tools
tools = [
{"name": s.name, "description": s.description,
"parameters": getattr(s, "parameters", {})}
for s in agent_cfg.skills
] if agent_cfg.skills else None
self.fsm.transition(task_id, TaskStatus.RUNNING)
try:
driver = self._get_driver(task.agent)
dr = await driver.execute(
system_prompt=system_prompt,
user_message=f"{task.title}\n\n{task.description}",
tools=tools,
config={"model": agent_cfg.model,
"temperature": agent_cfg.temperature,
"max_tokens": agent_cfg.max_tokens}
)
result = dr.content
if "[需要确认]" in result or "[WAIT_HUMAN]" in result:
self.fsm.transition(task_id, TaskStatus.WAITING_HUMAN, result)
else:
self.fsm.transition(task_id, TaskStatus.DONE, result)
return result
except Exception as e:
self.fsm.transition(task_id, TaskStatus.FAILED, str(e))
raise
5. 新增 MCP Server 接口(接入 OpenClaw 的关键)
# soloflow/mcp_server.py — 让 OpenClaw 可以调用 SoloFlow
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import time
# MCP 标准:SoloFlow 暴露的工具列表
SOLOFLOW_TOOLS = [
{
"name": "create_task",
"description": "创建并执行一个 SoloFlow 任务,由 AI 员工完成",
"inputSchema": {
"type": "object",
"properties": {
"agent": {"type": "string", "description": "目标员工名称"},
"title": {"type": "string"},
"description": {"type": "string"},
"flow_id": {"type": "string", "description": "可选,关联到已有对话流"}
},
"required": ["agent", "title"]
}
},
{
"name": "get_task_result",
"description": "获取任务执行结果",
"inputSchema": {
"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]
}
},
{
"name": "list_agents",
"description": "列出所有可用的 AI 员工",
"inputSchema": {"type": "object", "properties": {}}
}
]
def create_mcp_router(engine): # engine: FlowEngine
from fastapi import APIRouter
router = APIRouter(prefix="/mcp")
@router.get("/tools")
def list_tools():
"""MCP 标准接口:返回工具列表"""
return {"tools": SOLOFLOW_TOOLS}
@router.post("/tools/call")
async def call_tool(body: dict):
"""MCP 标准接口:执行工具调用"""
tool_name = body.get("name")
args = body.get("arguments", {})
if tool_name == "create_task":
task = engine.fsm.create(
title=args["title"],
description=args.get("description", ""),
agent=args["agent"],
context={"flow_id": args.get("flow_id", "")}
)
result = await engine.run_task(task.id, args.get("flow_id"))
return {"content": [{"type": "text", "text": result}]}
elif tool_name == "get_task_result":
task = engine.fsm.get(args["task_id"])
return {"content": [{"type": "text", "text": task.result or "任务未完成"}]}
elif tool_name == "list_agents":
agents = engine.loader.list_agents()
return {"content": [{"type": "text", "text": str(agents)}]}
raise HTTPException(400, f"Unknown tool: {tool_name}")
return router
重构后的文件结构
soloflow/
├── __init__.py
├── fsm.py # ✅ 保留(已完善)
├── agent_loader.py # 🔧 新增 driver/driver_config 字段解析
├── memory.py # ✅ 保留
├── context_bus.py # 🆕 新增:任务间上下文传递
├── runner.py # 🔄 重构为通用 FlowEngine
├── mcp_server.py # 🆕 新增:MCP 协议接口
├── web.py # 🔧 瘦身,只做路由,业务移到 runner
├── webhook.py # ✅ 保留
└── drivers/ # 🆕 新增目录
├── __init__.py
├── base.py
├── llm_driver.py
└── openclaw_driver.py
agents/ # YAML 配置(与领域解耦)
├── base.yaml # 公共配置
├── assistant.yaml # driver: llm
├── idea.yaml # driver: llm
├── writer.yaml # driver: llm
├── editor.yaml # driver: llm
├── publisher.yaml # driver: llm
└── coder.yaml # 🆕 示例:driver: openclaw
examples/ # 🆕 场景示例(展示通用性)
├── video_company/ # 视频创作(原有场景)
│ └── agents/
├── dev_team/ # 代码开发团队
│ └── agents/
└── ecommerce/ # 电商运营团队
└── agents/
接入 OpenClaw 的完整配置示例
# agents/coder.yaml(放入任意 agents/ 目录即生效)
name: coder
alias: 小码
role: 全栈工程师,负责代码实现和调试
driver: openclaw
driver_config:
endpoint: ${OPENCLAW_ENDPOINT} # http://localhost:3100
timeout: 600
api_key: ${OPENCLAW_API_KEY}
model: claude-3-5-sonnet-20241022
temperature: 0.2
can_delegate: false
system_prompt: |
你是「小码」,一名专业的全栈工程师。
接到任务后,你会:
1. 分析需求,制定实现方案
2. 使用工具写代码、运行测试
3. 完成后汇报结果
skills:
- name: write_file
description: 写入代码文件
parameters:
type: object
properties:
path: {type: string, description: 文件路径}
content: {type: string, description: 文件内容}
required: [path, content]
- name: run_command
description: 执行 shell 命令
parameters:
type: object
properties:
cmd: {type: string}
required: [cmd]
然后 .env 加一行:
OPENCLAW_ENDPOINT=http://localhost:3100
docker compose up 后,小码就会通过 OpenClaw 真正执行代码任务。
实施计划
| 阶段 |
内容 |
优先级 |
| P0 |
新增 drivers/ 目录(base + llm + openclaw) |
高 |
| P0 |
新增 context_bus.py |
高 |
| P0 |
重构 runner.py → FlowEngine |
高 |
| P1 |
agent_loader.py 增加 driver/driver_config 字段 |
高 |
| P1 |
新增 mcp_server.py |
中 |
| P2 |
新增 examples/ 目录(dev_team / ecommerce) |
中 |
| P2 |
web.py 瘦身 |
低 |
如果维护者认同方向,我可以提交完整 PR 实现上述所有改动。🚀
背景:当前版本的核心问题
经过对最新代码的详细 Review,SoloFlow v1 实现了 FSM + YAML + SQLite 的基本骨架,但有几个结构性问题阻碍了它成为一个通用框架(而不是一个视频创作专用工具):
问题 1:
runner.py硬编码了「视频公司」领域逻辑这意味着框架无法直接用于其他场景(电商、客服、代码开发),换个业务就得改源码。
问题 2:Agent 执行层没有「工具调用」抽象
当前
run_task()只做了纯文本 LLM 调用。OpenClaw 的核心能力是工具调用(Tool Use),但现在的AgentSkill只是一个描述字段,没有任何实际的 function call 机制:问题 3:任务间没有「上下文传递」机制
多个 Agent 顺序执行时,后一个 Agent 拿不到前一个的输出作为上下文。
Task.context字段虽然存在,但runner.py中创建子任务时从未填充它:问题 4:无法接入 OpenClaw(缺少 Heartbeat + MCP 协议层)
OpenClaw 的工作模式是:收到一个任务 → 调用 MCP tools → 完成后通过 heartbeat/webhook 回调。当前没有任何 MCP Server 接口,也没有异步任务完成回调机制。
问题 5:
web.py过重,承担了太多业务逻辑soloflow/web.py(11973 bytes)混合了路由、业务逻辑、HTML 渲染。后续难以扩展。重构目标
具体重构方案与代码
1. 新增
soloflow/drivers/— Agent Driver 抽象层这是本次重构的核心创新,解决 OpenClaw 接入问题。
2. 修改 YAML 格式:增加
driver字段3. 新增
ContextBus:任务间上下文传递4. 重构
runner.py:去掉领域硬编码,改为通用 FlowEngine5. 新增 MCP Server 接口(接入 OpenClaw 的关键)
重构后的文件结构
接入 OpenClaw 的完整配置示例
然后
.env加一行:docker compose up后,小码就会通过 OpenClaw 真正执行代码任务。实施计划
drivers/目录(base + llm + openclaw)context_bus.pyrunner.py→FlowEngineagent_loader.py增加driver/driver_config字段mcp_server.pyexamples/目录(dev_team / ecommerce)web.py瘦身如果维护者认同方向,我可以提交完整 PR 实现上述所有改动。🚀