Skip to content

🚀 新方案提案:SoloFlow —— 基于「任务流水线 + 轻量状态机」的通用AI一人公司落地框架 #2

@SonicBotMan

Description

@SonicBotMan

背景与动机

当前项目设计非常完整(架构图、角色定义、Prompt设计都很好),但缺少可运行的代码实现。参考 paperclipai/paperclip 的思路(org chart + heartbeat + ticket + budget),我想提出一个全新的、更加普世、零依赖部署的落地方案:SoloFlow

⚠️ 本方案不是照抄 paperclip,而是从不同视角切入:paperclip 是「公司操作系统」,重在治理和组织;SoloFlow 是「任务流水线编排器」,重在快速落地和个人可用。


核心思路:任务状态机 + YAML 驱动角色

与 paperclip 的根本区别

维度 paperclip SoloFlow (本方案)
定位 公司操作系统,管理多公司多团队 个人创作流水线,1人+AI团队
部署 Node.js + PostgreSQL + pnpm monorepo 单文件 Python / 纯 Docker Compose
上手时间 较长(需 pnpm 20+ 环境) docker compose up 即可
角色定义 代码写死 + skills 目录 YAML 文件,改配置即换角色
调度机制 Heartbeat 心跳轮询 有向无环图(DAG)任务状态机
记忆系统 会话持久化 偏好向量 + SQLite 本地存储
核心抽象 员工/公司/预算 任务/节点/转移条件

SoloFlow 架构

用户 (老板)
  │
  ▼
┌──────────────────────────────────────────────────────┐
│                   SoloFlow Core                      │
│                                                      │
│  ┌─────────────┐    ┌──────────────────────────┐    │
│  │  CLI / Web  │───▶│    TaskFSM (状态机引擎)   │    │
│  │  一行命令   │    │  pending→running→done     │    │
│  └─────────────┘    └──────────┬───────────────┘    │
│                                │                     │
│         ┌──────────────────────┼──────────────┐      │
│         ▼                      ▼              ▼      │
│  ┌─────────────┐  ┌──────────────────┐  ┌─────────┐ │
│  │ AgentLoader │  │  MemoryStore     │  │ Router  │ │
│  │ YAML驱动    │  │  SQLite + 向量   │  │ 任务路由│ │
│  └─────────────┘  └──────────────────┘  └─────────┘ │
└──────────────────────────────────────────────────────┘
         │
         ▼
   ┌─────────────────────────────────────────┐
   │           Agent Pool (可插拔)            │
   │  小助(调度) 小点(热点) 小文(文案)        │
   │  小材(素材) 小剪(剪辑) 小营(营销)       │
   └─────────────────────────────────────────┘

具体代码实现

1. 项目结构

soloflow/
├── soloflow/
│   ├── __init__.py
│   ├── fsm.py            # 任务状态机核心
│   ├── agent_loader.py   # YAML驱动的Agent加载器
│   ├── router.py         # 任务路由,决定谁干活
│   ├── memory.py         # 偏好记忆系统
│   ├── runner.py         # 主运行循环
│   └── web.py            # 轻量Web界面 (FastAPI)
├── agents/
│   ├── base.yaml         # 公共配置
│   ├── assistant.yaml    # 小助 - 老板助理
│   ├── idea.yaml         # 小点 - 点子王
│   ├── writer.yaml       # 小文 - 文案师
│   ├── editor.yaml       # 小剪 - 剪辑师
│   └── publisher.yaml    # 小发 - 发布专家
├── docker-compose.yml
├── .env.example
└── main.py               # 入口,一个文件搞定

2. 任务状态机 soloflow/fsm.py

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
import uuid, time, json
from pathlib import Path
import sqlite3

class TaskStatus(Enum):
    PENDING = "pending"
    ASSIGNED = "assigned"
    RUNNING = "running"
    WAITING_HUMAN = "waiting_human"  # 关键:需要人类确认时暂停
    DONE = "done"
    FAILED = "failed"

@dataclass
class Task:
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    title: str = ""
    description: str = ""
    agent: str = ""             # 分配给哪个 agent
    parent_id: Optional[str] = None  # 支持子任务
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[str] = None
    context: Dict[str, Any] = field(default_factory=dict)
    created_at: float = field(default_factory=time.time)
    updated_at: float = field(default_factory=time.time)

class TaskFSM:
    """轻量任务状态机,存储在 SQLite,无需 PostgreSQL"""

    TRANSITIONS = {
        TaskStatus.PENDING:       [TaskStatus.ASSIGNED, TaskStatus.FAILED],
        TaskStatus.ASSIGNED:      [TaskStatus.RUNNING, TaskStatus.FAILED],
        TaskStatus.RUNNING:       [TaskStatus.WAITING_HUMAN, TaskStatus.DONE, TaskStatus.FAILED],
        TaskStatus.WAITING_HUMAN: [TaskStatus.RUNNING, TaskStatus.FAILED],
        TaskStatus.DONE:          [],
        TaskStatus.FAILED:        [TaskStatus.PENDING],  # 支持重试
    }

    def __init__(self, db_path: str = "soloflow.db"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self._init_db()

    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id TEXT PRIMARY KEY,
                title TEXT,
                description TEXT,
                agent TEXT,
                parent_id TEXT,
                status TEXT,
                result TEXT,
                context TEXT,
                created_at REAL,
                updated_at REAL
            )
        """)
        self.conn.commit()

    def create(self, title: str, description: str, agent: str = "",
               parent_id: str = None, context: dict = None) -> Task:
        task = Task(
            title=title, description=description,
            agent=agent, parent_id=parent_id,
            context=context or {}
        )
        self.conn.execute(
            "INSERT INTO tasks VALUES (?,?,?,?,?,?,?,?,?,?)",
            (task.id, task.title, task.description, task.agent,
             task.parent_id, task.status.value, task.result,
             json.dumps(task.context), task.created_at, task.updated_at)
        )
        self.conn.commit()
        return task

    def transition(self, task_id: str, new_status: TaskStatus,
                   result: str = None) -> Task:
        task = self.get(task_id)
        allowed = self.TRANSITIONS[task.status]
        if new_status not in allowed:
            raise ValueError(f"Invalid transition: {task.status}{new_status}")
        task.status = new_status
        task.result = result
        task.updated_at = time.time()
        self.conn.execute(
            "UPDATE tasks SET status=?, result=?, updated_at=? WHERE id=?",
            (new_status.value, result, task.updated_at, task_id)
        )
        self.conn.commit()
        return task

    def get(self, task_id: str) -> Task:
        row = self.conn.execute(
            "SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
        if not row: raise KeyError(f"Task {task_id} not found")
        return Task(
            id=row[0], title=row[1], description=row[2], agent=row[3],
            parent_id=row[4], status=TaskStatus(row[5]), result=row[6],
            context=json.loads(row[7] or "{}"),
            created_at=row[8], updated_at=row[9]
        )

    def list_pending(self) -> List[Task]:
        rows = self.conn.execute(
            "SELECT * FROM tasks WHERE status='pending' ORDER BY created_at"
        ).fetchall()
        return [self.get(r[0]) for r in rows]

3. YAML 驱动的 Agent 加载器 soloflow/agent_loader.py

import yaml
from pathlib import Path
from typing import Dict, Any

class AgentConfig:
    def __init__(self, data: dict):
        self.name = data["name"]
        self.alias = data["alias"]          # 花名,如「小助」
        self.role = data["role"]             # 角色描述
        self.system_prompt = data["system_prompt"]
        self.skills = data.get("skills", [])  # 可调用的外部工具
        self.can_delegate = data.get("can_delegate", False)
        self.model = data.get("model", "gpt-4o")  # 支持不同Agent用不同模型
        self.temperature = data.get("temperature", 0.7)

class AgentLoader:
    """从 agents/ 目录加载所有 YAML 配置,支持热重载"""

    def __init__(self, agents_dir: str = "agents"):
        self.agents_dir = Path(agents_dir)
        self._cache: Dict[str, AgentConfig] = {}
        self.reload()

    def reload(self):
        self._cache.clear()
        for yaml_file in self.agents_dir.glob("*.yaml"):
            if yaml_file.name == "base.yaml":
                continue
            data = yaml.safe_load(yaml_file.read_text())
            # 继承 base.yaml 公共配置
            base_path = self.agents_dir / "base.yaml"
            if base_path.exists():
                base = yaml.safe_load(base_path.read_text())
                merged = {**base, **data}
            else:
                merged = data
            cfg = AgentConfig(merged)
            self._cache[cfg.name] = cfg

    def get(self, name: str) -> AgentConfig:
        return self._cache[name]

    def all(self) -> Dict[str, AgentConfig]:
        return self._cache

4. Agent 配置示例 agents/assistant.yaml

name: assistant
alias: 小助
role: 老板助理,唯一对话入口
model: gpt-4o
temperature: 0.3
can_delegate: true

system_prompt: |
  你是「小助」,老板的专属AI助理,也是这家AI公司的唯一对话入口。

  你的职责:
  1. 理解老板的意图,拆解成具体任务
  2. 把任务分配给合适的AI员工(点子王/文案师/剪辑师等)
  3. 汇总员工成果,向老板汇报
  4. 提取老板偏好,存入记忆系统

  分配规则:
  - 需要热点/创意 → 点子王(idea)
  - 需要写文案/脚本 → 文案师(writer)
  - 需要视频制作 → 剪辑师(editor)
  - 需要发布推广 → 发布专家(publisher)

  重要:每次汇报都要简洁,给老板足够的掌控感,关键节点必须请老板确认。

skills:
  - name: create_task
    description: 创建新任务并分配给员工
  - name: get_task_status
    description: 查询任务进度
  - name: recall_preference
    description: 查询老板的偏好记忆

5. 偏好记忆系统 soloflow/memory.py

import sqlite3
import json
import time
from typing import List, Dict, Optional

class PreferenceMemory:
    """
    核心创新点:基于「标签+置信度+衰减」的偏好学习系统。
    不依赖向量数据库,纯 SQLite 实现,0 额外依赖。
    """

    def __init__(self, db_path: str = "soloflow.db"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self._init_db()

    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS preferences (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                agent TEXT NOT NULL,      -- 哪个员工学到的偏好
                category TEXT NOT NULL,   -- 偏好类别,如「视频节奏」
                value TEXT NOT NULL,      -- 偏好值,如「快节奏」
                confidence REAL DEFAULT 0.5,   -- 置信度 0-1
                evidence TEXT,            -- 来源证据(原始评语)
                updated_at REAL
            )
        """)
        self.conn.commit()

    def update(self, agent: str, category: str, value: str,
               evidence: str, delta: float = 0.1):
        """贝叶斯式置信度更新:正向反馈+delta,负向反馈-delta"""
        row = self.conn.execute(
            "SELECT id, confidence FROM preferences WHERE agent=? AND category=? AND value=?",
            (agent, category, value)
        ).fetchone()

        if row:
            new_conf = min(1.0, max(0.0, row[1] + delta))
            self.conn.execute(
                "UPDATE preferences SET confidence=?, evidence=?, updated_at=? WHERE id=?",
                (new_conf, evidence, time.time(), row[0])
            )
        else:
            self.conn.execute(
                "INSERT INTO preferences (agent, category, value, confidence, evidence, updated_at) VALUES (?,?,?,?,?,?)",
                (agent, category, value, 0.5 + delta, evidence, time.time())
            )
        self.conn.commit()

    def recall(self, agent: str, top_k: int = 5) -> List[Dict]:
        """返回该员工已学到的置信度最高的偏好,注入到 system prompt"""
        rows = self.conn.execute(
            "SELECT category, value, confidence FROM preferences "
            "WHERE agent=? AND confidence > 0.3 ORDER BY confidence DESC LIMIT ?",
            (agent, top_k)
        ).fetchall()
        return [{"category": r[0], "value": r[1], "confidence": r[2]} for r in rows]

    def format_for_prompt(self, agent: str) -> str:
        """格式化偏好为可注入 prompt 的文本"""
        prefs = self.recall(agent)
        if not prefs:
            return "暂无偏好记录"
        lines = [f"- {p['category']}{p['value']} (置信度 {p['confidence']*100:.0f}%)"
                 for p in prefs]
        return "\n".join(lines)

6. 主运行器 soloflow/runner.py

import asyncio
from openai import AsyncOpenAI
from .fsm import TaskFSM, TaskStatus
from .agent_loader import AgentLoader
from .memory import PreferenceMemory
from .router import Router
import json

client = AsyncOpenAI()

class SoloFlowRunner:
    def __init__(self):
        self.fsm = TaskFSM()
        self.loader = AgentLoader()
        self.memory = PreferenceMemory()
        self.router = Router(self.loader)

    async def run_task(self, task_id: str) -> str:
        task = self.fsm.get(task_id)
        agent_cfg = self.loader.get(task.agent)

        # 动态注入偏好记忆到 system prompt(核心差异点)
        preference_context = self.memory.format_for_prompt(task.agent)
        system = agent_cfg.system_prompt + f"""

---
【老板偏好记忆 - 请在工作中自动应用】
{preference_context}
"""
        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": f"{task.title}\n\n{task.description}"}
        ]

        self.fsm.transition(task_id, TaskStatus.RUNNING)

        response = await client.chat.completions.create(
            model=agent_cfg.model,
            messages=messages,
            temperature=agent_cfg.temperature,
        )
        result = response.choices[0].message.content

        # 检查是否需要人工确认
        if "[需要确认]" in result or "[WAIT_HUMAN]" in result:
            self.fsm.transition(task_id, TaskStatus.WAITING_HUMAN, result)
        else:
            self.fsm.transition(task_id, TaskStatus.DONE, result)

        return result

    async def dispatch(self, user_input: str) -> str:
        """主入口:用户输入 → 小助拆解 → 分配任务 → 汇报结果"""
        # 先让小助理解意图
        assistant_cfg = self.loader.get("assistant")
        plan_messages = [
            {"role": "system", "content": assistant_cfg.system_prompt},
            {"role": "user", "content": user_input},
            {"role": "system", "content": "请以 JSON 格式返回任务列表:[{\"agent\": \"idea/writer/editor/publisher\", \"title\": \"...\", \"description\": \"...\"}]"}
        ]
        plan_resp = await client.chat.completions.create(
            model=assistant_cfg.model,
            messages=plan_messages,
            response_format={"type": "json_object"}
        )
        plan = json.loads(plan_resp.choices[0].message.content)
        tasks_data = plan.get("tasks", plan) if isinstance(plan, dict) else plan

        # 创建并执行任务
        results = []
        for t in tasks_data:
            task = self.fsm.create(
                title=t["title"],
                description=t["description"],
                agent=t["agent"]
            )
            result = await self.run_task(task.id)
            results.append(f"【{self.loader.get(t['agent']).alias}\n{result}")

        return "\n\n".join(results)

7. 一键部署 docker-compose.yml

version: "3.9"
services:
  soloflow:
    image: python:3.12-slim
    working_dir: /app
    volumes:
      - .:/app
      - soloflow_data:/app/data
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_PATH=/app/data/soloflow.db
    ports:
      - "8000:8000"
    command: >-
      sh -c "pip install -q fastapi uvicorn openai pyyaml &&
             python main.py"
volumes:
  soloflow_data:

8. 极简 Web UI soloflow/web.py

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from .runner import SoloFlowRunner

app = FastAPI(title="SoloFlow - AI一人公司")
runner = SoloFlowRunner()

class UserInput(BaseModel):
    message: str

@app.post("/chat")
async def chat(body: UserInput):
    result = await runner.dispatch(body.message)
    return {"result": result}

@app.get("/tasks")
def list_tasks():
    tasks = runner.fsm.list_pending()
    return [{"id": t.id, "title": t.title, "status": t.status.value, "agent": t.agent}
            for t in tasks]

@app.get("/preferences/{agent}")
def get_preferences(agent: str):
    return runner.memory.recall(agent)

@app.get("/", response_class=HTMLResponse)
def index():
    return """
    <!DOCTYPE html><html><head><title>SoloFlow</title>
    <style>body{font-family:sans-serif;max-width:800px;margin:40px auto;padding:20px}
    textarea{width:100%;height:80px;margin:10px 0}
    button{background:#4f46e5;color:#fff;padding:10px 24px;border:none;border-radius:6px;cursor:pointer}
    #output{white-space:pre-wrap;background:#f5f5f5;padding:16px;border-radius:6px;min-height:100px;margin-top:16px}</style>
    </head><body>
    <h1>🎬 SoloFlow - 你的AI一人公司</h1>
    <textarea id="input" placeholder="老板,请输入您的创作需求..."></textarea>
    <button onclick="send()">发送给小助</button>
    <div id="output">等待您的指令...</div>
    <script>
    async function send() {
      const msg = document.getElementById('input').value;
      document.getElementById('output').textContent = '处理中...';
      const res = await fetch('/chat', {method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({message:msg})});
      const data = await res.json();
      document.getElementById('output').textContent = data.result;
    }
    </script></body></html>
    """

与现有设计文档的对接

此方案完全沿用了你们已有的:

  • ✅ 7个AI员工角色(小助/小点/小文/小材/小剪/小营/小发)→ 每个对应一个 YAML 文件
  • ✅ 唯一对话入口设计 → Runner.dispatch() 强制走小助
  • ✅ 偏好学习系统 → PreferenceMemory 实现置信度更新
  • ✅ 任务闭环 → TaskFSM 完整状态转移

快速开始(3步)

# 1. 克隆项目
git clone https://github.com/SonicBotMan/ai-one-person-company.git && cd ai-one-person-company

# 2. 配置 API Key
echo "OPENAI_API_KEY=sk-xxx" > .env

# 3. 一键启动
docker compose up
# 打开 http://localhost:8000

我可以贡献什么

如果维护者认为这个方向值得推进,我可以:

  • 提交完整代码 PR
  • 补充 agents/*.yaml 的完整 system prompt
  • 接入 MiniMax / Moonshot 等国内模型(降低用户成本)
  • 添加微信/飞书 webhook 接口

期待讨论!🚀

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions