背景与动机
当前项目设计非常完整(架构图、角色定义、Prompt设计都很好),但缺少可运行的代码实现。参考 paperclipai/paperclip 的思路(org chart + heartbeat + ticket + budget),我想提出一个全新的、更加普世、零依赖部署的落地方案:SoloFlow。
⚠️ 本方案不是照抄 paperclip,而是从不同视角切入:paperclip 是「公司操作系统」,重在治理和组织;SoloFlow 是「任务流水线编排器」,重在快速落地和个人可用。
核心思路:任务状态机 + YAML 驱动角色
与 paperclip 的根本区别
| 维度 |
paperclip |
SoloFlow (本方案) |
| 定位 |
公司操作系统,管理多公司多团队 |
个人创作流水线,1人+AI团队 |
| 部署 |
Node.js + PostgreSQL + pnpm monorepo |
单文件 Python / 纯 Docker Compose |
| 上手时间 |
较长(需 pnpm 20+ 环境) |
docker compose up 即可 |
| 角色定义 |
代码写死 + skills 目录 |
YAML 文件,改配置即换角色 |
| 调度机制 |
Heartbeat 心跳轮询 |
有向无环图(DAG)任务状态机 |
| 记忆系统 |
会话持久化 |
偏好向量 + SQLite 本地存储 |
| 核心抽象 |
员工/公司/预算 |
任务/节点/转移条件 |
SoloFlow 架构
用户 (老板)
│
▼
┌──────────────────────────────────────────────────────┐
│ SoloFlow Core │
│ │
│ ┌─────────────┐ ┌──────────────────────────┐ │
│ │ CLI / Web │───▶│ TaskFSM (状态机引擎) │ │
│ │ 一行命令 │ │ pending→running→done │ │
│ └─────────────┘ └──────────┬───────────────┘ │
│ │ │
│ ┌──────────────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────────┐ ┌─────────┐ │
│ │ AgentLoader │ │ MemoryStore │ │ Router │ │
│ │ YAML驱动 │ │ SQLite + 向量 │ │ 任务路由│ │
│ └─────────────┘ └──────────────────┘ └─────────┘ │
└──────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Agent Pool (可插拔) │
│ 小助(调度) 小点(热点) 小文(文案) │
│ 小材(素材) 小剪(剪辑) 小营(营销) │
└─────────────────────────────────────────┘
具体代码实现
1. 项目结构
soloflow/
├── soloflow/
│ ├── __init__.py
│ ├── fsm.py # 任务状态机核心
│ ├── agent_loader.py # YAML驱动的Agent加载器
│ ├── router.py # 任务路由,决定谁干活
│ ├── memory.py # 偏好记忆系统
│ ├── runner.py # 主运行循环
│ └── web.py # 轻量Web界面 (FastAPI)
├── agents/
│ ├── base.yaml # 公共配置
│ ├── assistant.yaml # 小助 - 老板助理
│ ├── idea.yaml # 小点 - 点子王
│ ├── writer.yaml # 小文 - 文案师
│ ├── editor.yaml # 小剪 - 剪辑师
│ └── publisher.yaml # 小发 - 发布专家
├── docker-compose.yml
├── .env.example
└── main.py # 入口,一个文件搞定
2. 任务状态机 soloflow/fsm.py
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
import uuid, time, json
from pathlib import Path
import sqlite3
class TaskStatus(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
RUNNING = "running"
WAITING_HUMAN = "waiting_human" # 关键:需要人类确认时暂停
DONE = "done"
FAILED = "failed"
@dataclass
class Task:
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
title: str = ""
description: str = ""
agent: str = "" # 分配给哪个 agent
parent_id: Optional[str] = None # 支持子任务
status: TaskStatus = TaskStatus.PENDING
result: Optional[str] = None
context: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
class TaskFSM:
"""轻量任务状态机,存储在 SQLite,无需 PostgreSQL"""
TRANSITIONS = {
TaskStatus.PENDING: [TaskStatus.ASSIGNED, TaskStatus.FAILED],
TaskStatus.ASSIGNED: [TaskStatus.RUNNING, TaskStatus.FAILED],
TaskStatus.RUNNING: [TaskStatus.WAITING_HUMAN, TaskStatus.DONE, TaskStatus.FAILED],
TaskStatus.WAITING_HUMAN: [TaskStatus.RUNNING, TaskStatus.FAILED],
TaskStatus.DONE: [],
TaskStatus.FAILED: [TaskStatus.PENDING], # 支持重试
}
def __init__(self, db_path: str = "soloflow.db"):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
title TEXT,
description TEXT,
agent TEXT,
parent_id TEXT,
status TEXT,
result TEXT,
context TEXT,
created_at REAL,
updated_at REAL
)
""")
self.conn.commit()
def create(self, title: str, description: str, agent: str = "",
parent_id: str = None, context: dict = None) -> Task:
task = Task(
title=title, description=description,
agent=agent, parent_id=parent_id,
context=context or {}
)
self.conn.execute(
"INSERT INTO tasks VALUES (?,?,?,?,?,?,?,?,?,?)",
(task.id, task.title, task.description, task.agent,
task.parent_id, task.status.value, task.result,
json.dumps(task.context), task.created_at, task.updated_at)
)
self.conn.commit()
return task
def transition(self, task_id: str, new_status: TaskStatus,
result: str = None) -> Task:
task = self.get(task_id)
allowed = self.TRANSITIONS[task.status]
if new_status not in allowed:
raise ValueError(f"Invalid transition: {task.status} → {new_status}")
task.status = new_status
task.result = result
task.updated_at = time.time()
self.conn.execute(
"UPDATE tasks SET status=?, result=?, updated_at=? WHERE id=?",
(new_status.value, result, task.updated_at, task_id)
)
self.conn.commit()
return task
def get(self, task_id: str) -> Task:
row = self.conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row: raise KeyError(f"Task {task_id} not found")
return Task(
id=row[0], title=row[1], description=row[2], agent=row[3],
parent_id=row[4], status=TaskStatus(row[5]), result=row[6],
context=json.loads(row[7] or "{}"),
created_at=row[8], updated_at=row[9]
)
def list_pending(self) -> List[Task]:
rows = self.conn.execute(
"SELECT * FROM tasks WHERE status='pending' ORDER BY created_at"
).fetchall()
return [self.get(r[0]) for r in rows]
3. YAML 驱动的 Agent 加载器 soloflow/agent_loader.py
import yaml
from pathlib import Path
from typing import Dict, Any
class AgentConfig:
def __init__(self, data: dict):
self.name = data["name"]
self.alias = data["alias"] # 花名,如「小助」
self.role = data["role"] # 角色描述
self.system_prompt = data["system_prompt"]
self.skills = data.get("skills", []) # 可调用的外部工具
self.can_delegate = data.get("can_delegate", False)
self.model = data.get("model", "gpt-4o") # 支持不同Agent用不同模型
self.temperature = data.get("temperature", 0.7)
class AgentLoader:
"""从 agents/ 目录加载所有 YAML 配置,支持热重载"""
def __init__(self, agents_dir: str = "agents"):
self.agents_dir = Path(agents_dir)
self._cache: Dict[str, AgentConfig] = {}
self.reload()
def reload(self):
self._cache.clear()
for yaml_file in self.agents_dir.glob("*.yaml"):
if yaml_file.name == "base.yaml":
continue
data = yaml.safe_load(yaml_file.read_text())
# 继承 base.yaml 公共配置
base_path = self.agents_dir / "base.yaml"
if base_path.exists():
base = yaml.safe_load(base_path.read_text())
merged = {**base, **data}
else:
merged = data
cfg = AgentConfig(merged)
self._cache[cfg.name] = cfg
def get(self, name: str) -> AgentConfig:
return self._cache[name]
def all(self) -> Dict[str, AgentConfig]:
return self._cache
4. Agent 配置示例 agents/assistant.yaml
name: assistant
alias: 小助
role: 老板助理,唯一对话入口
model: gpt-4o
temperature: 0.3
can_delegate: true
system_prompt: |
你是「小助」,老板的专属AI助理,也是这家AI公司的唯一对话入口。
你的职责:
1. 理解老板的意图,拆解成具体任务
2. 把任务分配给合适的AI员工(点子王/文案师/剪辑师等)
3. 汇总员工成果,向老板汇报
4. 提取老板偏好,存入记忆系统
分配规则:
- 需要热点/创意 → 点子王(idea)
- 需要写文案/脚本 → 文案师(writer)
- 需要视频制作 → 剪辑师(editor)
- 需要发布推广 → 发布专家(publisher)
重要:每次汇报都要简洁,给老板足够的掌控感,关键节点必须请老板确认。
skills:
- name: create_task
description: 创建新任务并分配给员工
- name: get_task_status
description: 查询任务进度
- name: recall_preference
description: 查询老板的偏好记忆
5. 偏好记忆系统 soloflow/memory.py
import sqlite3
import json
import time
from typing import List, Dict, Optional
class PreferenceMemory:
"""
核心创新点:基于「标签+置信度+衰减」的偏好学习系统。
不依赖向量数据库,纯 SQLite 实现,0 额外依赖。
"""
def __init__(self, db_path: str = "soloflow.db"):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS preferences (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL, -- 哪个员工学到的偏好
category TEXT NOT NULL, -- 偏好类别,如「视频节奏」
value TEXT NOT NULL, -- 偏好值,如「快节奏」
confidence REAL DEFAULT 0.5, -- 置信度 0-1
evidence TEXT, -- 来源证据(原始评语)
updated_at REAL
)
""")
self.conn.commit()
def update(self, agent: str, category: str, value: str,
evidence: str, delta: float = 0.1):
"""贝叶斯式置信度更新:正向反馈+delta,负向反馈-delta"""
row = self.conn.execute(
"SELECT id, confidence FROM preferences WHERE agent=? AND category=? AND value=?",
(agent, category, value)
).fetchone()
if row:
new_conf = min(1.0, max(0.0, row[1] + delta))
self.conn.execute(
"UPDATE preferences SET confidence=?, evidence=?, updated_at=? WHERE id=?",
(new_conf, evidence, time.time(), row[0])
)
else:
self.conn.execute(
"INSERT INTO preferences (agent, category, value, confidence, evidence, updated_at) VALUES (?,?,?,?,?,?)",
(agent, category, value, 0.5 + delta, evidence, time.time())
)
self.conn.commit()
def recall(self, agent: str, top_k: int = 5) -> List[Dict]:
"""返回该员工已学到的置信度最高的偏好,注入到 system prompt"""
rows = self.conn.execute(
"SELECT category, value, confidence FROM preferences "
"WHERE agent=? AND confidence > 0.3 ORDER BY confidence DESC LIMIT ?",
(agent, top_k)
).fetchall()
return [{"category": r[0], "value": r[1], "confidence": r[2]} for r in rows]
def format_for_prompt(self, agent: str) -> str:
"""格式化偏好为可注入 prompt 的文本"""
prefs = self.recall(agent)
if not prefs:
return "暂无偏好记录"
lines = [f"- {p['category']}:{p['value']} (置信度 {p['confidence']*100:.0f}%)"
for p in prefs]
return "\n".join(lines)
6. 主运行器 soloflow/runner.py
import asyncio
from openai import AsyncOpenAI
from .fsm import TaskFSM, TaskStatus
from .agent_loader import AgentLoader
from .memory import PreferenceMemory
from .router import Router
import json
client = AsyncOpenAI()
class SoloFlowRunner:
def __init__(self):
self.fsm = TaskFSM()
self.loader = AgentLoader()
self.memory = PreferenceMemory()
self.router = Router(self.loader)
async def run_task(self, task_id: str) -> str:
task = self.fsm.get(task_id)
agent_cfg = self.loader.get(task.agent)
# 动态注入偏好记忆到 system prompt(核心差异点)
preference_context = self.memory.format_for_prompt(task.agent)
system = agent_cfg.system_prompt + f"""
---
【老板偏好记忆 - 请在工作中自动应用】
{preference_context}
"""
messages = [
{"role": "system", "content": system},
{"role": "user", "content": f"{task.title}\n\n{task.description}"}
]
self.fsm.transition(task_id, TaskStatus.RUNNING)
response = await client.chat.completions.create(
model=agent_cfg.model,
messages=messages,
temperature=agent_cfg.temperature,
)
result = response.choices[0].message.content
# 检查是否需要人工确认
if "[需要确认]" in result or "[WAIT_HUMAN]" in result:
self.fsm.transition(task_id, TaskStatus.WAITING_HUMAN, result)
else:
self.fsm.transition(task_id, TaskStatus.DONE, result)
return result
async def dispatch(self, user_input: str) -> str:
"""主入口:用户输入 → 小助拆解 → 分配任务 → 汇报结果"""
# 先让小助理解意图
assistant_cfg = self.loader.get("assistant")
plan_messages = [
{"role": "system", "content": assistant_cfg.system_prompt},
{"role": "user", "content": user_input},
{"role": "system", "content": "请以 JSON 格式返回任务列表:[{\"agent\": \"idea/writer/editor/publisher\", \"title\": \"...\", \"description\": \"...\"}]"}
]
plan_resp = await client.chat.completions.create(
model=assistant_cfg.model,
messages=plan_messages,
response_format={"type": "json_object"}
)
plan = json.loads(plan_resp.choices[0].message.content)
tasks_data = plan.get("tasks", plan) if isinstance(plan, dict) else plan
# 创建并执行任务
results = []
for t in tasks_data:
task = self.fsm.create(
title=t["title"],
description=t["description"],
agent=t["agent"]
)
result = await self.run_task(task.id)
results.append(f"【{self.loader.get(t['agent']).alias}】\n{result}")
return "\n\n".join(results)
7. 一键部署 docker-compose.yml
version: "3.9"
services:
soloflow:
image: python:3.12-slim
working_dir: /app
volumes:
- .:/app
- soloflow_data:/app/data
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_PATH=/app/data/soloflow.db
ports:
- "8000:8000"
command: >-
sh -c "pip install -q fastapi uvicorn openai pyyaml &&
python main.py"
volumes:
soloflow_data:
8. 极简 Web UI soloflow/web.py
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from .runner import SoloFlowRunner
app = FastAPI(title="SoloFlow - AI一人公司")
runner = SoloFlowRunner()
class UserInput(BaseModel):
message: str
@app.post("/chat")
async def chat(body: UserInput):
result = await runner.dispatch(body.message)
return {"result": result}
@app.get("/tasks")
def list_tasks():
tasks = runner.fsm.list_pending()
return [{"id": t.id, "title": t.title, "status": t.status.value, "agent": t.agent}
for t in tasks]
@app.get("/preferences/{agent}")
def get_preferences(agent: str):
return runner.memory.recall(agent)
@app.get("/", response_class=HTMLResponse)
def index():
return """
<!DOCTYPE html><html><head><title>SoloFlow</title>
<style>body{font-family:sans-serif;max-width:800px;margin:40px auto;padding:20px}
textarea{width:100%;height:80px;margin:10px 0}
button{background:#4f46e5;color:#fff;padding:10px 24px;border:none;border-radius:6px;cursor:pointer}
#output{white-space:pre-wrap;background:#f5f5f5;padding:16px;border-radius:6px;min-height:100px;margin-top:16px}</style>
</head><body>
<h1>🎬 SoloFlow - 你的AI一人公司</h1>
<textarea id="input" placeholder="老板,请输入您的创作需求..."></textarea>
<button onclick="send()">发送给小助</button>
<div id="output">等待您的指令...</div>
<script>
async function send() {
const msg = document.getElementById('input').value;
document.getElementById('output').textContent = '处理中...';
const res = await fetch('/chat', {method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({message:msg})});
const data = await res.json();
document.getElementById('output').textContent = data.result;
}
</script></body></html>
"""
与现有设计文档的对接
此方案完全沿用了你们已有的:
- ✅ 7个AI员工角色(小助/小点/小文/小材/小剪/小营/小发)→ 每个对应一个 YAML 文件
- ✅ 唯一对话入口设计 → Runner.dispatch() 强制走小助
- ✅ 偏好学习系统 → PreferenceMemory 实现置信度更新
- ✅ 任务闭环 → TaskFSM 完整状态转移
快速开始(3步)
# 1. 克隆项目
git clone https://github.com/SonicBotMan/ai-one-person-company.git && cd ai-one-person-company
# 2. 配置 API Key
echo "OPENAI_API_KEY=sk-xxx" > .env
# 3. 一键启动
docker compose up
# 打开 http://localhost:8000
我可以贡献什么
如果维护者认为这个方向值得推进,我可以:
期待讨论!🚀
背景与动机
当前项目设计非常完整(架构图、角色定义、Prompt设计都很好),但缺少可运行的代码实现。参考 paperclipai/paperclip 的思路(org chart + heartbeat + ticket + budget),我想提出一个全新的、更加普世、零依赖部署的落地方案:SoloFlow。
核心思路:任务状态机 + YAML 驱动角色
与 paperclip 的根本区别
docker compose up即可SoloFlow 架构
具体代码实现
1. 项目结构
2. 任务状态机
soloflow/fsm.py3. YAML 驱动的 Agent 加载器
soloflow/agent_loader.py4. Agent 配置示例
agents/assistant.yaml5. 偏好记忆系统
soloflow/memory.py6. 主运行器
soloflow/runner.py7. 一键部署
docker-compose.yml8. 极简 Web UI
soloflow/web.py与现有设计文档的对接
此方案完全沿用了你们已有的:
快速开始(3步)
我可以贡献什么
如果维护者认为这个方向值得推进,我可以:
agents/*.yaml的完整 system prompt期待讨论!🚀