适用人群:准备从零开始构建 Agent 项目的开发者 前置知识:Agent 基础概念、LangGraph 使用经验 预计学习时间:4-5 小时
在开始编码前,明确以下问题:
- 目标:Agent 要解决什么问题?
- 用户:谁会使用这个 Agent?
- 输入:Agent 接收什么类型的输入?
- 输出:期望什么格式的输出?
- 工具:需要调用哪些外部工具/API?
- 约束:延迟、成本、安全要求?
- 扩展:未来可能需要哪些功能?
| 类型 | 特征 | 适合场景 |
|---|---|---|
| 问答型 | 单轮/多轮对话 | 客服、知识问答 |
| 任务型 | 完成特定任务 | 数据提取、格式转换 |
| 工作流型 | 多步骤流程 | 报告生成、数据分析 |
| 协作型 | 多 Agent 协作 | 复杂研究、创意生成 |
| 自主型 | 目标驱动 | 长期任务、探索性任务 |
项目需要什么?
├── 简单问答? → 直接 API 调用 + Prompt
├── 需要工具调用? → LangChain / OpenAI Function Calling
├── 需要多步骤工作流? → LangGraph
├── 需要多 Agent 协作? → LangGraph Supervisor / CrewAI
└── 需要企业级部署? → Deep Agents / 自定义 Harness
# 创建虚拟环境
python -m venv agent-env
source agent-env/bin/activate # Linux/Mac
# 安装核心依赖
pip install langgraph langchain langchain-openai
pip install langsmith python-dotenv
pip install fastapi uvicorn pydantic
pip install psycopg2-binary # PostgreSQL 支持
# 安装开发工具
pip install pytest pytest-asyncio
pip install black isort mypy
pip install ipythonmy-agent-project/
├── src/
│ ├── __init__.py
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── researcher.py # 研究 Agent
│ │ ├── analyst.py # 分析 Agent
│ │ └── writer.py # 写作 Agent
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── search.py # 搜索工具
│ │ └── file_ops.py # 文件操作
│ ├── workflows/
│ │ ├── __init__.py
│ │ ├── research_flow.py # 研究工作流
│ │ └── supervisor.py # 监督器
│ └── config/
│ ├── __init__.py
│ └── settings.py # 配置管理
├── tests/
│ ├── test_agents.py
│ ├── test_tools.py
│ └── test_workflows.py
├── app/
│ ├── main.py # FastAPI 入口
│ └── api.py # API 路由
├── docker/
│ ├── Dockerfile
│ └── docker-compose.yml
├── .env # 环境变量
├── pyproject.toml # 项目配置
└── README.md
# src/config/settings.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
openai_api_key: str
openai_model: str = "gpt-4o"
temperature: float = 0.7
database_url: str = "sqlite:///checkpoints.db"
langsmith_api_key: str = ""
langsmith_project: str = "my-agent"
max_iterations: int = 10
timeout_seconds: int = 300
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()# .env
OPENAI_API_KEY=sk-your-key-here
OPENAI_MODEL=gpt-4o
DATABASE_URL=postgresql://user:pass@localhost:5432/agent_db
LANGSMITH_API_KEY=lsv2_...
LANGSMITH_PROJECT=research-assistant┌─────────────────────────────────────────────────┐
│ 接口层 (API) │
│ FastAPI / LangServe / CLI / Web UI │
├─────────────────────────────────────────────────┤
│ 编排层 (Orchestration) │
│ LangGraph Workflow / Supervisor │
├─────────────────────────────────────────────────┤
│ Agent 层 │
│ Researcher Agent | Analyst Agent | Writer Agent│
├─────────────────────────────────────────────────┤
│ 工具层 (Tools) │
│ Search | Read | Write | Calculate | API Call │
├─────────────────────────────────────────────────┤
│ 基础设施层 │
│ LLM API | Database | Cache | Message Queue │
└─────────────────────────────────────────────────┘
from typing import TypedDict, List, Optional, Annotated
from langgraph.graph.message import add_messages
class ResearchState(TypedDict):
topic: str
requirements: dict
search_queries: List[str]
search_results: List[dict]
analysis_results: List[dict]
key_findings: List[str]
outline: dict
draft: str
final_report: str
current_step: str
iterations: int
errors: List[str]
messages: Annotated[list, add_messages]from typing import Optional
class AgentError(Exception):
def __init__(self, message: str, recoverable: bool = True):
self.message = message
self.recoverable = recoverable
super().__init__(self.message)
class ToolError(AgentError): pass
class LLMError(AgentError): pass
class TimeoutError(AgentError):
def __init__(self, message: str):
super().__init__(message, recoverable=False)# src/tools/search.py
from langchain_core.tools import tool
from typing import List
import httpx
@tool
def web_search(query: str, num_results: int = 5) -> str:
"""执行网络搜索,返回相关结果摘要"""
results = [
{"title": f"Result {i} for {query}", "snippet": f"Content about {query}..."}
for i in range(num_results)
]
return "\n".join([f"{r['title']}: {r['snippet']}" for r in results])
@tool
def read_file(file_path: str) -> str:
"""读取文件内容"""
try:
with open(file_path, 'r') as f:
return f.read()
except FileNotFoundError:
return f"Error: File {file_path} not found"
@tool
def write_file(file_path: str, content: str) -> str:
"""写入文件内容"""
with open(file_path, 'w') as f:
f.write(content)
return f"Successfully wrote to {file_path}"# src/agents/researcher.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langgraph.prebuilt import create_react_agent
from src.tools.search import web_search
def create_researcher_agent(model: str = "gpt-4o"):
llm = ChatOpenAI(model=model, temperature=0.3)
tools = [web_search]
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的研究助手。你的职责是:
1. 根据主题生成搜索查询
2. 执行搜索并收集信息
3. 整理和总结发现的关键信息
始终保持客观,引用来源,避免臆断。"""),
("human", "{input}"),
])
agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
return agent
# src/agents/analyst.py
def create_analyst_agent(model: str = "gpt-4o"):
llm = ChatOpenAI(model=model, temperature=0.2)
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个数据分析专家。你的职责是:
1. 分析收集到的研究数据
2. 识别关键趋势和模式
3. 提取重要发现
4. 生成结构化的分析结果
使用逻辑推理,避免偏见,提供数据支持的观点。"""),
("human", "{input}"),
])
agent = create_react_agent(llm=llm, tools=[], prompt=prompt)
return agent
# src/agents/writer.py
def create_writer_agent(model: str = "gpt-4o"):
llm = ChatOpenAI(model=model, temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的技术写作者。你的职责是:
1. 根据大纲和分析结果撰写报告
2. 确保结构清晰、逻辑连贯
3. 使用专业但易懂的语言
4. 包含执行摘要、正文和结论
报告格式:Markdown,包含标题、章节、要点。"""),
("human", "{input}"),
])
agent = create_react_agent(llm=llm, tools=[], prompt=prompt)
return agent# src/workflows/research_flow.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
from src.agents.researcher import create_researcher_agent
from src.agents.analyst import create_analyst_agent
from src.agents.writer import create_writer_agent
class ResearchState(TypedDict):
topic: str
research_results: str
analysis_results: str
final_report: str
errors: List[str]
def research_node(state: ResearchState) -> dict:
agent = create_researcher_agent()
result = agent.invoke({"input": f"研究主题:{state['topic']}"})
return {"research_results": result["messages"][-1].content}
def analysis_node(state: ResearchState) -> dict:
agent = create_analyst_agent()
result = agent.invoke({"input": f"分析以下研究结果:\n{state['research_results']}"})
return {"analysis_results": result["messages"][-1].content}
def writing_node(state: ResearchState) -> dict:
agent = create_writer_agent()
result = agent.invoke({"input": f"""基于以下分析结果撰写报告:
主题:{state['topic']}
分析:{state['analysis_results']}"""})
return {"final_report": result["messages"][-1].content}
def create_research_workflow():
workflow = StateGraph(ResearchState)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analysis_node)
workflow.add_node("write", writing_node)
workflow.add_edge(START, "research")
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", END)
return workflow.compile()# src/workflows/supervisor.py
from langgraph_supervisor import create_supervisor
from src.agents.researcher import create_researcher_agent
from src.agents.analyst import create_analyst_agent
from src.agents.writer import create_writer_agent
from langgraph.checkpoint.memory import MemorySaver
def create_supervisor_workflow():
researcher = create_researcher_agent()
analyst = create_analyst_agent()
writer = create_writer_agent()
supervisor = create_supervisor(
agents=[
{"name": "researcher", "agent": researcher},
{"name": "analyst", "agent": analyst},
{"name": "writer", "agent": writer},
],
model="gpt-4o",
prompt_template=(
"你是一个项目管理 supervisor。根据用户请求,"
"协调 researcher、analyst 和 writer 完成研究任务。"
),
checkpointer=MemorySaver(),
)
return supervisor# tests/test_tools.py
import pytest
from src.tools.search import web_search
def test_web_search():
result = web_search.invoke("Python programming")
assert "Python" in result
assert len(result) > 0
# tests/test_agents.py
import pytest
from src.agents.researcher import create_researcher_agent
@pytest.mark.asyncio
async def test_researcher_agent():
agent = create_researcher_agent()
result = await agent.ainvoke({"input": "研究 Python 3.12 的新特性"})
assert "messages" in result
assert len(result["messages"]) > 0
# tests/test_workflows.py
import pytest
from src.workflows.research_flow import create_research_workflow
def test_research_workflow():
app = create_research_workflow()
result = app.invoke({
"topic": "AI Agent 发展趋势",
"research_results": "",
"analysis_results": "",
"final_report": "",
"errors": []
})
assert "final_report" in result
assert len(result["final_report"]) > 0# tests/test_integration.py
import pytest
from src.workflows.supervisor import create_supervisor_workflow
@pytest.mark.asyncio
async def test_supervisor_end_to_end():
app = create_supervisor_workflow()
config = {"configurable": {"thread_id": "test-1"}}
result = await app.ainvoke(
{"input": "研究并撰写关于 AI Agent 的报告"},
config=config
)
assert result is not None
def test_workflow_with_checkpointer():
from langgraph.checkpoint.memory import MemorySaver
from src.workflows.research_flow import create_research_workflow
checkpointer = MemorySaver()
app = create_research_workflow()
config = {"configurable": {"thread_id": "test-checkpoint"}}
result1 = app.invoke({
"topic": "测试主题",
"research_results": "",
"analysis_results": "",
"final_report": "",
"errors": []
}, config=config)
assert "final_report" in result1# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from src.workflows.research_flow import create_research_workflow
from langgraph.checkpoint.postgres import PostgresSaver
import asyncio
app = FastAPI(title="Research Agent API", version="1.0.0")
class ResearchRequest(BaseModel):
topic: str
max_iterations: int = 10
class ResearchResponse(BaseModel):
topic: str
final_report: str
status: str
# 初始化工作流
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
workflow = create_research_workflow()
app_instance = workflow.compile(checkpointer=checkpointer)
@app.post("/api/research", response_model=ResearchResponse)
async def run_research(request: ResearchRequest):
try:
import uuid
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = await app_instance.ainvoke({
"topic": request.topic,
"research_results": "",
"analysis_results": "",
"final_report": "",
"errors": []
}, config=config)
return ResearchResponse(
topic=request.topic,
final_report=result.get("final_report", ""),
status="completed"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)# app/serve.py
from langserve import add_routes
from fastapi import FastAPI
from src.workflows.research_flow import create_research_workflow
app = FastAPI()
workflow = create_research_workflow().compile()
add_routes(
app,
workflow,
path="/research",
input_type=dict,
output_type=dict,
)# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ src/
COPY app/ app/
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml
version: '3.8'
services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://postgres:postgres@db:5432/agent_db
depends_on:
- db
restart: unless-stopped
db:
image: postgres:16
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=agent_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
postgres_data:- 环境变量配置(API Keys、数据库连接)
- 检查点持久化(PostgreSQL/Redis)
- 监控和日志(LangSmith、Prometheus)
- 错误处理和重试机制
- 速率限制和超时设置
- 安全审计和权限控制
- 备份和恢复策略
- 性能基准测试
import os
from langsmith import Client, traceable
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "lsv2_..."
os.environ["LANGSMITH_PROJECT"] = "research-assistant"
client = Client()
@traceable(run_type="chain")
def research_step(topic: str) -> str:
"""带追踪的研究步骤"""
return f"Research results for {topic}"
@traceable(run_type="chain")
def analysis_step(research_results: str) -> str:
"""带追踪的分析步骤"""
return f"Analysis of results"# 1. 使用流式输出减少延迟
async def stream_workflow(app, input_data):
async for event in app.astream(input_data):
yield event
# 2. 缓存常见结果
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_search(query: str) -> str:
return web_search.invoke(query)
# 3. 并行执行独立任务
import asyncio
async def parallel_research(topics: list):
tasks = [research_topic(t) for t in topics]
results = await asyncio.gather(*tasks)
return results
# 4. 模型选择优化
MODEL_CONFIGS = {
"fast": {"model": "gpt-4o-mini", "temperature": 0.3},
"balanced": {"model": "gpt-4o", "temperature": 0.5},
"quality": {"model": "o1", "temperature": 0.0},
}| 指标 | 目标 | 监控方式 |
|---|---|---|
| 响应时间 | < 30s (P95) | LangSmith Traces |
| 成功率 | > 95% | 错误日志 |
| Token 消耗 | 预算内 | LangSmith Usage |
| 并发用户 | > 100 | 服务器监控 |
| 成本/请求 | < $0.50 | 账单追踪 |
构建一个完整的智能研究助手,能够:
- 接收用户研究主题
- 自动搜索和收集信息
- 分析数据并提取关键发现
- 生成结构化研究报告
- 支持人工审核和修改
# main.py - 完整的研究助手
from typing import TypedDict, List, Optional, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langgraph.prebuilt import create_react_agent
import uuid
# === 工具定义 ===
@tool
def search_web(query: str) -> str:
"""搜索网络获取信息"""
return f"搜索结果: {query}"
@tool
def save_file(path: str, content: str) -> str:
"""保存内容到文件"""
return f"已保存到 {path}"
# === Agent 定义 ===
def create_researcher():
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
prompt = ChatPromptTemplate.from_messages([
("system", "你是研究专家,负责收集信息。"),
("human", "{input}"),
])
return create_react_agent(llm, [search_web], prompt)
def create_analyst():
llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
prompt = ChatPromptTemplate.from_messages([
("system", "你是数据分析专家,负责提取关键发现。"),
("human", "{input}"),
])
return create_react_agent(llm, [], prompt)
def create_writer():
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", "你是技术写作者,负责撰写报告。"),
("human", "{input}"),
])
return create_react_agent(llm, [save_file], prompt)
# === 状态定义 ===
class ResearchState(TypedDict):
topic: str
research_results: str
analysis_results: str
draft: str
final_report: str
needs_review: bool
feedback: Optional[str]
messages: Annotated[list, add_messages]
# === 节点定义 ===
def research_node(state: ResearchState) -> dict:
agent = create_researcher()
result = agent.invoke({"input": f"研究主题:{state['topic']}"})
return {"research_results": result["messages"][-1].content}
def analysis_node(state: ResearchState) -> dict:
agent = create_analyst()
result = agent.invoke({"input": f"分析:{state['research_results']}"})
return {"analysis_results": result["messages"][-1].content}
def writing_node(state: ResearchState) -> dict:
agent = create_writer()
result = agent.invoke({"input": f"撰写报告,分析结果:{state['analysis_results']}"})
return {"draft": result["messages"][-1].content, "needs_review": True}
def review_node(state: ResearchState) -> dict:
# 人工审核节点 - 实际应用中这里会等待用户输入
if state.get("feedback"):
return {"draft": state["draft"] + f"\n\n根据反馈修改:{state['feedback']}", "needs_review": False}
return {"needs_review": False}
def finalize_node(state: ResearchState) -> dict:
return {"final_report": state["draft"]}
# === 条件路由 ===
def should_review(state: ResearchState) -> str:
if state.get("needs_review"):
return "review"
return "finalize"
# === 工作流构建 ===
def create_complete_workflow():
workflow = StateGraph(ResearchState)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analysis_node)
workflow.add_node("write", writing_node)
workflow.add_node("review", review_node)
workflow.add_node("finalize", finalize_node)
workflow.add_edge(START, "research")
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_conditional_edges("write", should_review, {"review": "review", "finalize": "finalize"})
workflow.add_edge("review", "finalize")
workflow.add_edge("finalize", END)
checkpointer = MemorySaver()
return workflow.compile(checkpointer=checkpointer)
# === 使用示例 ===
if __name__ == "__main__":
app = create_complete_workflow()
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = app.invoke({
"topic": "AI Agent 在 2026 年的发展趋势",
"research_results": "",
"analysis_results": "",
"draft": "",
"final_report": "",
"needs_review": False,
"feedback": None,
"messages": []
}, config=config)
print("=== 最终报告 ===")
print(result["final_report"])=== 执行流程 ===
1. [research] 正在搜索 AI Agent 2026 趋势...
2. [analyze] 正在分析搜索结果...
3. [write] 正在撰写报告...
4. [review] 等待人工审核...
5. [finalize] 生成最终报告
=== 报告摘要 ===
# AI Agent 2026 发展趋势研究报告
## 执行摘要
2026 年 AI Agent 领域呈现以下关键趋势...
## 主要发现
1. 多 Agent 协作成为主流
2. 自主性显著提升
3. MCP 协议广泛采用
...
## 结论
AI Agent 正在从工具型助手向自主协作体演进...
问题:Agent 无法正确调用工具或返回错误结果。
解决:
# 1. 确保工具描述清晰
@tool
def search(query: str) -> str:
"""执行网络搜索。参数 query 是搜索关键词,返回相关结果摘要。"""
...
# 2. 添加重试逻辑
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def reliable_search(query: str) -> str:
return search.invoke(query)问题:Agent 陷入循环,不断重复相同操作。
解决:
# 1. 设置最大迭代次数
config = {"recursion_limit": 50}
# 2. 在状态中跟踪迭代
class State(TypedDict):
iteration_count: int
def node(state: State) -> dict:
if state["iteration_count"] > 10:
return {"error": "达到最大迭代次数"}
return {"iteration_count": state["iteration_count"] + 1}问题:对话历史超出模型上下文窗口。
解决:
# 1. 消息截断
def truncate_messages(messages: list, max_tokens: int = 4000) -> list:
total = sum(len(m.content) // 4 for m in messages)
if total > max_tokens:
return messages[-10:] # 保留最近 10 条
return messages
# 2. 摘要压缩
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
def compress_history(messages: list) -> list:
summary = llm.invoke(f"总结以下对话:{messages}")
return [SystemMessage(content=summary.content)]问题:多用户同时使用时出现状态混乱。
解决:
# 1. 使用唯一的 thread_id
import uuid
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# 2. 使用持久化检查点
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(DB_URL)基于本模块内容,独立开发一个完整的 Agent 项目,要求:
- 至少包含 2 个 Agent
- 使用 LangGraph 编排工作流
- 实现检查点持久化
- 编写单元测试
- 部署为 FastAPI 服务
对你的 Agent 项目进行性能优化:
- 实现流式输出
- 添加结果缓存
- 比较优化前后的响应时间和成本
使用 LangSmith 创建一个监控面板:
- 追踪工作流执行
- 分析 Token 消耗
- 设置告警规则
改进 Agent 的错误处理:
- 实现自定义异常类
- 添加重试机制
- 设计优雅降级策略
- 需求分析完成,明确目标和约束
- 技术栈选择合理
- 项目结构清晰
- 状态设计完整
- 工具实现并测试
- Agent 实现并测试
- 工作流编排完成
- 检查点配置
- 单元测试覆盖核心逻辑
- 集成测试验证工作流
- 端到端测试完整流程
- 错误处理测试
- 性能基准测试
- Docker 镜像构建
- 环境变量配置
- 数据库连接验证
- API 文档生成
- 健康检查端点
- LangSmith 集成
- 日志收集配置
- 监控告警设置
- 备份策略实施
- 性能优化完成