📚 学习教程

【高级应用】Day7:Multi-Agent实战–构建高效的AI协作系统

· 2026-04-12 · 7 阅读

【高级应用】Day7:Multi-Agent实战–构建高效的AI协作系统

👤 龙主编 📅 2026-04-12 👁️ 7 阅读 💬 0 评论

章节导语

一个AI Agent能力有限,但如果多个Agent协作呢?就像一个团队,有人负责规划,有人负责执行,有人负责检查。

Multi-Agent不是简单地把多个Agent串起来,而是让它们有分工、有协作、有通信。真正的协作是1+1>2。

本文通过3个实战项目,系统讲解Multi-Agent的协作模式和代码实现。

一、前置说明

1.1 学习路径

阶段 内容
基础 Agent架构(Day1)
进阶 多Agent协作

1.2 读者需要的基础

  • Python面向对象:类、继承、多态
  • Agent理解:知道什么是Agent、Tool
  • API调用:会调用LLM

1.3 学习目标

学完本文,你将能够:

  • 理解Multi-Agent的3种协作模式
  • 实现Agent间的通信机制
  • 构建一个完整的多Agent系统

二、Multi-Agent核心概念

2.1 为什么需要Multi-Agent

单Agent的局限性:

  • 能力单一,什么都做但什么都做不精
  • 上下文有限,复杂任务处理不过来
  • 容易出错,没有复查机制

Multi-Agent的优势:

  • 分工专业:专家Agent做专业的事
  • 并行处理:多个Agent同时工作
  • 互相校验:Agent之间可以检查彼此的工作
  • 可扩展:随时添加新的Agent

2.2 协作模式

协作模式
图1:Multi-Agent协作模式
模式 特点 适用场景
顺序执行 A→B→C 流水线处理
并行执行 A/B/C同时 多路查询
层次协作 Manager调度 复杂任务分解

2.3 Agent通信机制

Multi-Agent的核心是通信。常见方式:

  • 共享消息队列:Agent之间通过队列传递消息
  • 共享内存:共用一个存储,读取他人结果
  • 直接调用:Agent直接调用其他Agent的方法

三、环境配置

3.1 安装依赖

pip install openai anthropic

# 建议安装版本控制
pip install gitpython

3.2 API Key配置

# .env
OPENAI_API_KEY=sk-xxxx
ANTHROPIC_API_KEY=sk-ant-xxxx

四、基础架构:Agent基类

4.1 设计思路

先设计一个Agent基类,包含通用能力:

  • 消息处理
  • 状态管理
  • 工具注册
  • 通信接口

4.2 代码实现

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum

class AgentState(Enum):
    """Agent状态"""
    IDLE = "idle"
    WORKING = "working"
    WAITING = "waiting"
    DONE = "done"
    ERROR = "error"

@dataclass
class Message:
    """消息结构"""
    sender: str           # 发送者
    receiver: str         # 接收者(可选,None表示广播)
    content: Any          # 消息内容
    msg_type: str = "text"  # 消息类型

class BaseAgent(ABC):
    """Agent基类
    
    所有Agent的父类,定义通用接口
    """
    
    def __init__(self, name: str):
        self.name = name
        self.state = AgentState.IDLE
        self.tools = {}  # 注册的工具
        self.context = {}  # 共享上下文
        self.inbox = []  # 收件箱
    
    def register_tool(self, name: str, func: callable):
        """注册工具
        
        参数:
            name: 工具名
            func: 工具函数
        """
        self.tools[name] = func
        print(f"🔧 [{self.name}] 注册工具: {name}")
    
    def send_message(self, receiver: str, content: Any, msg_type: str = "text"):
        """发送消息
        
        参数:
            receiver: 接收者名称
            content: 消息内容
            msg_type: 消息类型
        """
        msg = Message(
            sender=self.name,
            receiver=receiver,
            content=content,
            msg_type=msg_type
        )
        # 实际发送需要通过消息队列或Manager
        return msg
    
    def receive_message(self, message: Message):
        """接收消息"""
        self.inbox.append(message)
        print(f"📩 [{self.name}] 收到消息 from {message.sender}")
    
    @abstractmethod
    def think(self, message: Message) -> Optional[Message]:
        """思考:处理消息,返回回复或新消息
        
        这是抽象方法,子类必须实现
        """
        pass
    
    def run(self):
        """运行Agent"""
        print(f"🚀 [{self.name}] 启动")
        self.state = AgentState.WORKING
        
        # 处理收件箱
        while self.inbox:
            message = self.inbox.pop(0)
            response = self.think(message)
            if response:
                self.send_message(response.receiver, response.content)
        
        self.state = AgentState.DONE
        print(f"✅ [{self.name}] 完成")

4.3 小结

BaseAgent定义了通用接口,让具体Agent专注于实现think方法。

五、实战1:代码审查团队

5.1 需求分析

构建一个代码审查Multi-Agent系统:

  • Planner Agent:分析代码,确定审查重点
  • Reviewer Agent:执行具体审查
  • Summarizer Agent:汇总审查结果

5.2 代码实现

from openai import OpenAI

client = OpenAI()

class CodeReviewer(BaseAgent):
    """代码审查Agent"""
    
    def __init__(self, name="Reviewer"):
        super().__init__(name)
        self.register_tool("review_code", self.review_code)
    
    def review_code(self, code: str) -> str:
        """审查代码
        
        实际调用LLM进行代码审查
        """
        prompt = f"""请审查以下Python代码,检查:
1. 潜在bug
2. 性能问题
3. 安全漏洞
4. 代码风格

代码:
```python
{code}
```

请分点回答:"""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    def think(self, message: Message) -> Optional[Message]:
        """处理审查请求"""
        if message.msg_type == "review_request":
            code = message.content.get("code", "")
            result = self.review_code(code)
            
            return Message(
                sender=self.name,
                receiver=message.sender,
                content={"review": result},
                msg_type="review_result"
            )
        return None

class CodeSummarizer(BaseAgent):
    """结果汇总Agent"""
    
    def __init__(self, name="Summarizer"):
        super().__init__(name)
    
    def summarize_reviews(self, reviews: List[Dict]) -> str:
        """汇总多个审查结果"""
        prompt = f"""请汇总以下代码审查结果,给出一份综合报告:

{chr(10).join([f"审查{i+1}:{r.get('review', '')}" for i, r in enumerate(reviews)])}

报告要求:
1. 问题分类(严重/中等/建议)
2. 修复优先级
3. 总体评价"""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    def think(self, message: Message) -> Optional[Message]:
        if message.msg_type == "summarize":
            reviews = message.content.get("reviews", [])
            summary = self.summarize_reviews(reviews)
            
            return Message(
                sender=self.name,
                receiver=message.sender,
                content={"summary": summary},
                msg_type="final_report"
            )
        return None

class CodeReviewTeam:
    """代码审查团队
    
    协调多个Agent工作
    """
    
    def __init__(self):
        self.planner = BaseAgent("Planner")
        self.reviewer = CodeReviewer("Reviewer")
        self.summarizer = CodeSummarizer("Summarizer")
        self.agents = [self.planner, self.reviewer, self.summarizer]
    
    def review(self, code: str) -> str:
        """执行代码审查"""
        print(f"\n{'='*60}")
        print("代码审查团队启动")
        print("="*60)
        
        # 1. Planner分析代码
        print("\n📋 [Planner] 分析代码结构...")
        
        # 2. Reviewer审查
        print("🔍 [Reviewer] 执行代码审查...")
        review_result = self.reviewer.review_code(code)
        
        # 3. Summarizer汇总
        print("📝 [Summarizer] 汇总审查结果...")
        summary = self.summarizer.summarize_reviews([{"review": review_result}])
        
        return summary

# 测试
if __name__ == "__main__":
    team = CodeReviewTeam()
    
    sample_code = """
def calculate_user_score(user_id, items):
    total = 0
    for item in items:
        total += item['price'] * item['quantity']
    
    # 这里可能有问题
    if user_id in items:
        total = total * 0.9
    
    return total
"""
    
    result = team.review(sample_code)
    print("\n" + "="*60)
    print("最终审查报告:")
    print("="*60)
    print(result)

5.3 运行结果

$ python code_review_team.py

============================================================
代码审查团队启动
============================================================

📋 [Planner] 分析代码结构...
🔍 [Reviewer] 执行代码审查...
📝 [Summarizer] 汇总审查结果...

============================================================
最终审查报告:
============================================================

## 综合代码审查报告

### 问题分类

**严重问题:**
1. 逻辑错误:`user_id in items` 这里items是商品列表,不是用户ID列表

**中等问题:**
1. 变量命名不清晰
2. 缺少类型注解

**建议:**
1. 可以用sum()函数简化循环

5.4 小结

代码审查团队展示了顺序协作模式:Planner → Reviewer → Summarizer。

六、实战2:并行研究团队

6.1 需求分析

一个研究任务,需要同时从多个角度分析:

  • Web Researcher:搜索网络资料
  • Data Analyst:分析数据
  • Expert Interviewer:模拟专家访谈

6.2 代码实现

并行研究
图2:并行研究流程
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

class WebResearcher(BaseAgent):
    """网络研究Agent"""
    
    def __init__(self, name="WebResearcher"):
        super().__init__(name)
    
    def research(self, topic: str) -> Dict:
        """搜索网络资料"""
        prompt = f"""请研究以下主题,提供详细信息:

主题:{topic}

请提供:
1. 核心概念解释
2. 最新发展动态
3. 关键数据和案例
4. 参考来源"""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return {"source": "web", "content": response.choices[0].message.content}
    
    def think(self, message: Message) -> Optional[Message]:
        if message.msg_type == "research":
            topic = message.content.get("topic", "")
            result = self.research(topic)
            return Message(
                sender=self.name,
                receiver=message.sender,
                content=result,
                msg_type="research_result"
            )
        return None

class DataAnalyst(BaseAgent):
    """数据分析Agent"""
    
    def __init__(self, name="DataAnalyst"):
        super().__init__(name)
    
    def analyze(self, topic: str) -> Dict:
        """分析数据"""
        prompt = f"""请分析以下主题的相关数据:

主题:{topic}

请提供:
1. 市场规模和增长率
2. 主要参与者和市场份额
3. 趋势分析和预测
4. 关键洞察"""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return {"source": "data", "content": response.choices[0].message.content}
    
    def think(self, message: Message) -> Optional[Message]:
        if message.msg_type == "analyze":
            topic = message.content.get("topic", "")
            result = self.analyze(topic)
            return Message(
                sender=self.name,
                receiver=message.sender,
                content=result,
                msg_type="analysis_result"
            )
        return None

class ExpertInterviewer(BaseAgent):
    """专家访谈Agent"""
    
    def __init__(self, name="ExpertInterviewer"):
        super().__init__(name)
    
    def interview(self, topic: str) -> Dict:
        """模拟专家访谈"""
        prompt = f"""请模拟一位行业专家,接受关于以下主题的访谈:

主题:{topic}

请以专家视角:
1. 分享核心观点
2. 回答常见质疑
3. 给出建议和预测
4. 指出常见误区"""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return {"source": "expert", "content": response.choices[0].message.content}
    
    def think(self, message: Message) -> Optional[Message]:
        if message.msg_type == "interview":
            topic = message.content.get("topic", "")
            result = self.interview(topic)
            return Message(
                sender=self.name,
                receiver=message.sender,
                content=result,
                msg_type="interview_result"
            )
        return None

class ResearchTeam:
    """并行研究团队
    
    多个Agent同时工作,最后汇总
    """
    
    def __init__(self):
        self.web = WebResearcher()
        self.data = DataAnalyst()
        self.expert = ExpertInterviewer()
    
    def research(self, topic: str) -> Dict:
        """并行研究"""
        print(f"\n{'='*60}")
        print(f"研究主题:{topic}")
        print("="*60)
        
        results = {}
        
        # 并行执行三个研究任务
        with ThreadPoolExecutor(max_workers=3) as executor:
            futures = {
                executor.submit(self.web.research, topic): "web",
                executor.submit(self.data.analyze, topic): "data",
                executor.submit(self.expert.interview, topic): "expert",
            }
            
            for future in as_completed(futures):
                source = futures[future]
                try:
                    result = future.result()
                    results[source] = result
                    print(f"✅ [{source.upper()}] 研究完成")
                except Exception as e:
                    print(f"❌ [{source.upper()}] 研究失败: {e}")
        
        return results
    
    def generate_report(self, results: Dict) -> str:
        """生成综合报告"""
        prompt = f"""请基于以下三个角度的研究,生成一份综合分析报告:

【网络研究】
{results.get('web', {}).get('content', '')}

【数据分析】
{results.get('data', {}).get('content', '')}

【专家观点】
{results.get('expert', {}).get('content', '')}

报告要求:
1. 综合各角度观点
2. 找出共识和分歧
3. 给出最终结论"""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

# 测试
if __name__ == "__main__":
    team = ResearchTeam()
    
    topic = "大语言模型在企业中的实际应用"
    results = team.research(topic)
    
    print("\n" + "="*60)
    print("生成综合报告...")
    print("="*60)
    
    report = team.generate_report(results)
    print(report[:500] + "...")

6.3 运行结果

$ python research_team.py

============================================================
研究主题:大语言模型在企业中的实际应用
============================================================
✅ [DATA] 研究完成
✅ [EXPERT] 研究完成
✅ [WEB] 研究完成

============================================================
生成综合报告...
============================================================
## 综合分析报告

### 核心发现

1. **应用场景**:客户服务、内容生成、数据分析是三大主要场景
2. **实施挑战**:数据安全、集成难度、人才匮乏
3. **投资回报**:平均6-12个月收回成本

6.4 小结

并行研究团队展示同时执行模式,多个Agent并行工作,最后汇总。

七、实战3:智能客服系统

7.1 需求分析

构建一个智能客服Multi-Agent系统:

  • Triage Agent:分类用户问题
  • Product Agent:处理产品咨询
  • Order Agent:处理订单问题
  • Refund Agent:处理退款
  • Escalation Agent:处理复杂问题

7.2 代码实现

客服系统
图3:智能客服架构
class TriageAgent(BaseAgent):
    """分流Agent:判断问题类型"""
    
    def __init__(self, name="Triage"):
        super().__init__(name)
    
    def classify(self, query: str) -> str:
        """分类用户问题"""
        prompt = f"""请判断用户问题的类型:

问题:{query}

类型选项:
- product:产品咨询
- order:订单问题
- refund:退款申请
- technical:技术问题
- complaint:投诉
- other:其他

只回复类型名称:"""
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content.strip().lower()
    
    def think(self, message: Message) -> Optional[Message]:
        if message.msg_type == "classify":
            query = message.content.get("query", "")
            category = self.classify(query)
            
            return Message(
                sender=self.name,
                receiver=message.content.get("callback", "Manager"),
                content={"category": category, "query": query},
                msg_type="routed"
            )
        return None

class SpecializedAgent(BaseAgent):
    """专业Agent基类"""
    
    def __init__(self, name: str, specialty: str):
        super().__init__(name)
        self.specialty = specialty
    
    def handle(self, query: str) -> str:
        """处理专业问题"""
        prompt = f"""你是一个{self.specialty}专家。请回答用户问题:

问题:{query}

回答要求:
1. 专业准确
2. 简洁清晰
3. 如需操作指引,给出步骤"""
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    def think(self, message: Message) -> Optional[Message]:
        if message.msg_type == "handle":
            query = message.content.get("query", "")
            answer = self.handle(query)
            
            return Message(
                sender=self.name,
                receiver=message.content.get("callback", "Manager"),
                content={"answer": answer},
                msg_type="response"
            )
        return None

class CustomerServiceManager:
    """客服系统管理器"""
    
    def __init__(self):
        self.triage = TriageAgent()
        self.product = SpecializedAgent("Product", "产品")
        self.order = SpecializedAgent("Order", "订单")
        self.refund = SpecializedAgent("Refund", "退款")
        self.escalation = SpecializedAgent("Escalation", "复杂问题")
        
        # 路由表
        self.router = {
            "product": self.product,
            "order": self.order,
            "refund": self.refund,
        }
    
    def handle(self, user_query: str) -> str:
        """处理用户查询"""
        print(f"\n👤 用户:{user_query}")
        
        # 1. 分流
        print("📋 分流中...")
        category = self.triage.classify(user_query)
        print(f"   分类:{category}")
        
        # 2. 路由到专业Agent
        if category in self.router:
            print(f"🔄 路由到 {category} Agent...")
            agent = self.router[category]
            answer = agent.handle(user_query)
        else:
            # 复杂问题升级
            print("🔄 升级到专员...")
            answer = self.escalation.handle(user_query)
        
        print(f"\n🤖 回复:{answer[:100]}...")
        return answer

# 测试
if __name__ == "__main__":
    cs = CustomerServiceManager()
    
    queries = [
        "你们的会员有什么权益?",
        "我的订单什么时候发货?",
        "我想退款,怎么操作?",
        "这个产品价格能便宜吗?"
    ]
    
    print("="*60)
    print("智能客服系统演示")
    print("="*60)
    
    for q in queries:
        cs.handle(q)
        print("-"*60)

7.3 运行结果

$ python customer_service.py

============================================================
智能客服系统演示
============================================================

👤 用户:你们的会员有什么权益?
📋 分流中...
   分类:product
🔄 路由到 product Agent...

👤 用户:我的订单什么时候发货?
📋 分流中...
   分类:order
🔄 路由到 order Agent...

👤 用户:我想退款,怎么操作?
📋 分流中...
   分类:refund
🔄 路由到 refund Agent...

八、总结与练习

8.1 要点回顾

  • 顺序执行:A→B→C,适合流水线任务
  • 并行执行:A/B/C同时,适合多角度研究
  • 层次协作:Manager调度,适合复杂客服

8.2 选型指南

场景 推荐模式
代码审查 顺序执行
市场研究 并行执行
客服系统 层次协作

8.3 避坑指南

  • Agent过多:不要创建太多Agent,复杂度会爆炸
  • 通信混乱:明确消息格式和路由规则
  • 循环依赖:避免Agent之间循环调用

8.4 延伸阅读

  • LangChain Agents:Multi-Agent实现框架
  • AutoGen:微软的Multi-Agent对话框架
  • CrewAI:专注于Multi-Agent的框架

8.5 课后练习

基础题:实现一个简单的双人对话Agent系统。

进阶题:为代码审查团队添加一个”安全扫描”Agent。

挑战题:构建一个”AI辩论系统”,让两个Agent围绕话题辩论。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

微信公众号二维码

扫码关注公众号

QQ
QQ二维码

扫码添加QQ