【高级应用】Day7:Multi-Agent实战–构建高效的AI协作系统
章节导语
一个AI Agent能力有限,但如果多个Agent协作呢?就像一个团队,有人负责规划,有人负责执行,有人负责检查。
Multi-Agent不是简单地把多个Agent串起来,而是让它们有分工、有协作、有通信。真正的协作是1+1>2。
本文通过3个实战项目,系统讲解Multi-Agent的协作模式和代码实现。
一、前置说明
1.1 学习路径
| 阶段 | 内容 |
|---|---|
| 基础 | Agent架构(Day1) |
| 进阶 | 多Agent协作 |
1.2 读者需要的基础
- Python面向对象:类、继承、多态
- Agent理解:知道什么是Agent、Tool
- API调用:会调用LLM
1.3 学习目标
学完本文,你将能够:
- 理解Multi-Agent的3种协作模式
- 实现Agent间的通信机制
- 构建一个完整的多Agent系统
二、Multi-Agent核心概念
2.1 为什么需要Multi-Agent
单Agent的局限性:
- 能力单一,什么都做但什么都做不精
- 上下文有限,复杂任务处理不过来
- 容易出错,没有复查机制
Multi-Agent的优势:
- 分工专业:专家Agent做专业的事
- 并行处理:多个Agent同时工作
- 互相校验:Agent之间可以检查彼此的工作
- 可扩展:随时添加新的Agent
2.2 协作模式

| 模式 | 特点 | 适用场景 |
|---|---|---|
| 顺序执行 | A→B→C | 流水线处理 |
| 并行执行 | A/B/C同时 | 多路查询 |
| 层次协作 | Manager调度 | 复杂任务分解 |
2.3 Agent通信机制
Multi-Agent的核心是通信。常见方式:
- 共享消息队列:Agent之间通过队列传递消息
- 共享内存:共用一个存储,读取他人结果
- 直接调用:Agent直接调用其他Agent的方法
三、环境配置
3.1 安装依赖
pip install openai anthropic
# 建议安装版本控制
pip install gitpython
3.2 API Key配置
# .env
OPENAI_API_KEY=sk-xxxx
ANTHROPIC_API_KEY=sk-ant-xxxx
四、基础架构:Agent基类
4.1 设计思路
先设计一个Agent基类,包含通用能力:
- 消息处理
- 状态管理
- 工具注册
- 通信接口
4.2 代码实现
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class AgentState(Enum):
"""Agent状态"""
IDLE = "idle"
WORKING = "working"
WAITING = "waiting"
DONE = "done"
ERROR = "error"
@dataclass
class Message:
"""消息结构"""
sender: str # 发送者
receiver: str # 接收者(可选,None表示广播)
content: Any # 消息内容
msg_type: str = "text" # 消息类型
class BaseAgent(ABC):
"""Agent基类
所有Agent的父类,定义通用接口
"""
def __init__(self, name: str):
self.name = name
self.state = AgentState.IDLE
self.tools = {} # 注册的工具
self.context = {} # 共享上下文
self.inbox = [] # 收件箱
def register_tool(self, name: str, func: callable):
"""注册工具
参数:
name: 工具名
func: 工具函数
"""
self.tools[name] = func
print(f"🔧 [{self.name}] 注册工具: {name}")
def send_message(self, receiver: str, content: Any, msg_type: str = "text"):
"""发送消息
参数:
receiver: 接收者名称
content: 消息内容
msg_type: 消息类型
"""
msg = Message(
sender=self.name,
receiver=receiver,
content=content,
msg_type=msg_type
)
# 实际发送需要通过消息队列或Manager
return msg
def receive_message(self, message: Message):
"""接收消息"""
self.inbox.append(message)
print(f"📩 [{self.name}] 收到消息 from {message.sender}")
@abstractmethod
def think(self, message: Message) -> Optional[Message]:
"""思考:处理消息,返回回复或新消息
这是抽象方法,子类必须实现
"""
pass
def run(self):
"""运行Agent"""
print(f"🚀 [{self.name}] 启动")
self.state = AgentState.WORKING
# 处理收件箱
while self.inbox:
message = self.inbox.pop(0)
response = self.think(message)
if response:
self.send_message(response.receiver, response.content)
self.state = AgentState.DONE
print(f"✅ [{self.name}] 完成")
4.3 小结
BaseAgent定义了通用接口,让具体Agent专注于实现think方法。
五、实战1:代码审查团队
5.1 需求分析
构建一个代码审查Multi-Agent系统:
- Planner Agent:分析代码,确定审查重点
- Reviewer Agent:执行具体审查
- Summarizer Agent:汇总审查结果
5.2 代码实现
from openai import OpenAI
client = OpenAI()
class CodeReviewer(BaseAgent):
"""代码审查Agent"""
def __init__(self, name="Reviewer"):
super().__init__(name)
self.register_tool("review_code", self.review_code)
def review_code(self, code: str) -> str:
"""审查代码
实际调用LLM进行代码审查
"""
prompt = f"""请审查以下Python代码,检查:
1. 潜在bug
2. 性能问题
3. 安全漏洞
4. 代码风格
代码:
```python
{code}
```
请分点回答:"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
def think(self, message: Message) -> Optional[Message]:
"""处理审查请求"""
if message.msg_type == "review_request":
code = message.content.get("code", "")
result = self.review_code(code)
return Message(
sender=self.name,
receiver=message.sender,
content={"review": result},
msg_type="review_result"
)
return None
class CodeSummarizer(BaseAgent):
"""结果汇总Agent"""
def __init__(self, name="Summarizer"):
super().__init__(name)
def summarize_reviews(self, reviews: List[Dict]) -> str:
"""汇总多个审查结果"""
prompt = f"""请汇总以下代码审查结果,给出一份综合报告:
{chr(10).join([f"审查{i+1}:{r.get('review', '')}" for i, r in enumerate(reviews)])}
报告要求:
1. 问题分类(严重/中等/建议)
2. 修复优先级
3. 总体评价"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
def think(self, message: Message) -> Optional[Message]:
if message.msg_type == "summarize":
reviews = message.content.get("reviews", [])
summary = self.summarize_reviews(reviews)
return Message(
sender=self.name,
receiver=message.sender,
content={"summary": summary},
msg_type="final_report"
)
return None
class CodeReviewTeam:
"""代码审查团队
协调多个Agent工作
"""
def __init__(self):
self.planner = BaseAgent("Planner")
self.reviewer = CodeReviewer("Reviewer")
self.summarizer = CodeSummarizer("Summarizer")
self.agents = [self.planner, self.reviewer, self.summarizer]
def review(self, code: str) -> str:
"""执行代码审查"""
print(f"\n{'='*60}")
print("代码审查团队启动")
print("="*60)
# 1. Planner分析代码
print("\n📋 [Planner] 分析代码结构...")
# 2. Reviewer审查
print("🔍 [Reviewer] 执行代码审查...")
review_result = self.reviewer.review_code(code)
# 3. Summarizer汇总
print("📝 [Summarizer] 汇总审查结果...")
summary = self.summarizer.summarize_reviews([{"review": review_result}])
return summary
# 测试
if __name__ == "__main__":
team = CodeReviewTeam()
sample_code = """
def calculate_user_score(user_id, items):
total = 0
for item in items:
total += item['price'] * item['quantity']
# 这里可能有问题
if user_id in items:
total = total * 0.9
return total
"""
result = team.review(sample_code)
print("\n" + "="*60)
print("最终审查报告:")
print("="*60)
print(result)
5.3 运行结果
$ python code_review_team.py
============================================================
代码审查团队启动
============================================================
📋 [Planner] 分析代码结构...
🔍 [Reviewer] 执行代码审查...
📝 [Summarizer] 汇总审查结果...
============================================================
最终审查报告:
============================================================
## 综合代码审查报告
### 问题分类
**严重问题:**
1. 逻辑错误:`user_id in items` 这里items是商品列表,不是用户ID列表
**中等问题:**
1. 变量命名不清晰
2. 缺少类型注解
**建议:**
1. 可以用sum()函数简化循环
5.4 小结
代码审查团队展示了顺序协作模式:Planner → Reviewer → Summarizer。
六、实战2:并行研究团队
6.1 需求分析
一个研究任务,需要同时从多个角度分析:
- Web Researcher:搜索网络资料
- Data Analyst:分析数据
- Expert Interviewer:模拟专家访谈
6.2 代码实现

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
class WebResearcher(BaseAgent):
"""网络研究Agent"""
def __init__(self, name="WebResearcher"):
super().__init__(name)
def research(self, topic: str) -> Dict:
"""搜索网络资料"""
prompt = f"""请研究以下主题,提供详细信息:
主题:{topic}
请提供:
1. 核心概念解释
2. 最新发展动态
3. 关键数据和案例
4. 参考来源"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return {"source": "web", "content": response.choices[0].message.content}
def think(self, message: Message) -> Optional[Message]:
if message.msg_type == "research":
topic = message.content.get("topic", "")
result = self.research(topic)
return Message(
sender=self.name,
receiver=message.sender,
content=result,
msg_type="research_result"
)
return None
class DataAnalyst(BaseAgent):
"""数据分析Agent"""
def __init__(self, name="DataAnalyst"):
super().__init__(name)
def analyze(self, topic: str) -> Dict:
"""分析数据"""
prompt = f"""请分析以下主题的相关数据:
主题:{topic}
请提供:
1. 市场规模和增长率
2. 主要参与者和市场份额
3. 趋势分析和预测
4. 关键洞察"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return {"source": "data", "content": response.choices[0].message.content}
def think(self, message: Message) -> Optional[Message]:
if message.msg_type == "analyze":
topic = message.content.get("topic", "")
result = self.analyze(topic)
return Message(
sender=self.name,
receiver=message.sender,
content=result,
msg_type="analysis_result"
)
return None
class ExpertInterviewer(BaseAgent):
"""专家访谈Agent"""
def __init__(self, name="ExpertInterviewer"):
super().__init__(name)
def interview(self, topic: str) -> Dict:
"""模拟专家访谈"""
prompt = f"""请模拟一位行业专家,接受关于以下主题的访谈:
主题:{topic}
请以专家视角:
1. 分享核心观点
2. 回答常见质疑
3. 给出建议和预测
4. 指出常见误区"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return {"source": "expert", "content": response.choices[0].message.content}
def think(self, message: Message) -> Optional[Message]:
if message.msg_type == "interview":
topic = message.content.get("topic", "")
result = self.interview(topic)
return Message(
sender=self.name,
receiver=message.sender,
content=result,
msg_type="interview_result"
)
return None
class ResearchTeam:
"""并行研究团队
多个Agent同时工作,最后汇总
"""
def __init__(self):
self.web = WebResearcher()
self.data = DataAnalyst()
self.expert = ExpertInterviewer()
def research(self, topic: str) -> Dict:
"""并行研究"""
print(f"\n{'='*60}")
print(f"研究主题:{topic}")
print("="*60)
results = {}
# 并行执行三个研究任务
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(self.web.research, topic): "web",
executor.submit(self.data.analyze, topic): "data",
executor.submit(self.expert.interview, topic): "expert",
}
for future in as_completed(futures):
source = futures[future]
try:
result = future.result()
results[source] = result
print(f"✅ [{source.upper()}] 研究完成")
except Exception as e:
print(f"❌ [{source.upper()}] 研究失败: {e}")
return results
def generate_report(self, results: Dict) -> str:
"""生成综合报告"""
prompt = f"""请基于以下三个角度的研究,生成一份综合分析报告:
【网络研究】
{results.get('web', {}).get('content', '')}
【数据分析】
{results.get('data', {}).get('content', '')}
【专家观点】
{results.get('expert', {}).get('content', '')}
报告要求:
1. 综合各角度观点
2. 找出共识和分歧
3. 给出最终结论"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# 测试
if __name__ == "__main__":
team = ResearchTeam()
topic = "大语言模型在企业中的实际应用"
results = team.research(topic)
print("\n" + "="*60)
print("生成综合报告...")
print("="*60)
report = team.generate_report(results)
print(report[:500] + "...")
6.3 运行结果
$ python research_team.py
============================================================
研究主题:大语言模型在企业中的实际应用
============================================================
✅ [DATA] 研究完成
✅ [EXPERT] 研究完成
✅ [WEB] 研究完成
============================================================
生成综合报告...
============================================================
## 综合分析报告
### 核心发现
1. **应用场景**:客户服务、内容生成、数据分析是三大主要场景
2. **实施挑战**:数据安全、集成难度、人才匮乏
3. **投资回报**:平均6-12个月收回成本
6.4 小结
并行研究团队展示同时执行模式,多个Agent并行工作,最后汇总。
七、实战3:智能客服系统
7.1 需求分析
构建一个智能客服Multi-Agent系统:
- Triage Agent:分类用户问题
- Product Agent:处理产品咨询
- Order Agent:处理订单问题
- Refund Agent:处理退款
- Escalation Agent:处理复杂问题
7.2 代码实现

class TriageAgent(BaseAgent):
"""分流Agent:判断问题类型"""
def __init__(self, name="Triage"):
super().__init__(name)
def classify(self, query: str) -> str:
"""分类用户问题"""
prompt = f"""请判断用户问题的类型:
问题:{query}
类型选项:
- product:产品咨询
- order:订单问题
- refund:退款申请
- technical:技术问题
- complaint:投诉
- other:其他
只回复类型名称:"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content.strip().lower()
def think(self, message: Message) -> Optional[Message]:
if message.msg_type == "classify":
query = message.content.get("query", "")
category = self.classify(query)
return Message(
sender=self.name,
receiver=message.content.get("callback", "Manager"),
content={"category": category, "query": query},
msg_type="routed"
)
return None
class SpecializedAgent(BaseAgent):
"""专业Agent基类"""
def __init__(self, name: str, specialty: str):
super().__init__(name)
self.specialty = specialty
def handle(self, query: str) -> str:
"""处理专业问题"""
prompt = f"""你是一个{self.specialty}专家。请回答用户问题:
问题:{query}
回答要求:
1. 专业准确
2. 简洁清晰
3. 如需操作指引,给出步骤"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
def think(self, message: Message) -> Optional[Message]:
if message.msg_type == "handle":
query = message.content.get("query", "")
answer = self.handle(query)
return Message(
sender=self.name,
receiver=message.content.get("callback", "Manager"),
content={"answer": answer},
msg_type="response"
)
return None
class CustomerServiceManager:
"""客服系统管理器"""
def __init__(self):
self.triage = TriageAgent()
self.product = SpecializedAgent("Product", "产品")
self.order = SpecializedAgent("Order", "订单")
self.refund = SpecializedAgent("Refund", "退款")
self.escalation = SpecializedAgent("Escalation", "复杂问题")
# 路由表
self.router = {
"product": self.product,
"order": self.order,
"refund": self.refund,
}
def handle(self, user_query: str) -> str:
"""处理用户查询"""
print(f"\n👤 用户:{user_query}")
# 1. 分流
print("📋 分流中...")
category = self.triage.classify(user_query)
print(f" 分类:{category}")
# 2. 路由到专业Agent
if category in self.router:
print(f"🔄 路由到 {category} Agent...")
agent = self.router[category]
answer = agent.handle(user_query)
else:
# 复杂问题升级
print("🔄 升级到专员...")
answer = self.escalation.handle(user_query)
print(f"\n🤖 回复:{answer[:100]}...")
return answer
# 测试
if __name__ == "__main__":
cs = CustomerServiceManager()
queries = [
"你们的会员有什么权益?",
"我的订单什么时候发货?",
"我想退款,怎么操作?",
"这个产品价格能便宜吗?"
]
print("="*60)
print("智能客服系统演示")
print("="*60)
for q in queries:
cs.handle(q)
print("-"*60)
7.3 运行结果
$ python customer_service.py
============================================================
智能客服系统演示
============================================================
👤 用户:你们的会员有什么权益?
📋 分流中...
分类:product
🔄 路由到 product Agent...
👤 用户:我的订单什么时候发货?
📋 分流中...
分类:order
🔄 路由到 order Agent...
👤 用户:我想退款,怎么操作?
📋 分流中...
分类:refund
🔄 路由到 refund Agent...
八、总结与练习
8.1 要点回顾
- 顺序执行:A→B→C,适合流水线任务
- 并行执行:A/B/C同时,适合多角度研究
- 层次协作:Manager调度,适合复杂客服
8.2 选型指南
| 场景 | 推荐模式 |
|---|---|
| 代码审查 | 顺序执行 |
| 市场研究 | 并行执行 |
| 客服系统 | 层次协作 |
8.3 避坑指南
- Agent过多:不要创建太多Agent,复杂度会爆炸
- 通信混乱:明确消息格式和路由规则
- 循环依赖:避免Agent之间循环调用
8.4 延伸阅读
- LangChain Agents:Multi-Agent实现框架
- AutoGen:微软的Multi-Agent对话框架
- CrewAI:专注于Multi-Agent的框架
8.5 课后练习
基础题:实现一个简单的双人对话Agent系统。
进阶题:为代码审查团队添加一个”安全扫描”Agent。
挑战题:构建一个”AI辩论系统”,让两个Agent围绕话题辩论。