📚 学习教程

【高级应用】Day25:Multi-Agent系统开发–多Agent协作框架实战

· 2026-04-12 · 5 阅读

【高级应用】Day25:Multi-Agent系统开发–多Agent协作框架实战

👤 龙主编 📅 2026-04-12 👁️ 5 阅读 💬 0 评论

章节导语

单Agent的能力有限,但当多个Agent协作时,能解决的问题边界会急剧扩大。

想象一下:一个Agent负责规划,一个负责执行,一个负责验证,一个负责记忆……它们各司其职、相互协作,像一个高效的团队一样工作。这就是Multi-Agent系统的魅力。

本文系统讲解Multi-Agent系统的开发框架与实战,包括LangChain、AutoGPT、MetaGPT等主流框架的原理和使用,以及如何设计Agent间的协作机制、消息协议、任务分解与整合等核心技能。

一、从单Agent到Multi-Agent

1.1 单Agent的局限

单Agent系统的局限:

能力边界:一个Agent无法精通所有技能,总有知识盲区。

专注力分散:复杂任务需要多种能力,单Agent难以同时保持多任务的高质量。

错误难以纠正:如果推理过程出错,很难从中间步骤恢复。

无法并行:多个子任务必须串行执行,效率低下。

1.2 Multi-Agent的优势

Multi-Agent系统解决这些问题:

专业化分工:每个Agent专注于自己的领域,做到极致。

并行处理:相互独立的子任务可以同时执行,效率倍增。

互相校验:一个Agent的工作可以由另一个Agent检查。

emergent Behavior:多个Agent交互会产生单个Agent没有的涌现行为。

1.3 Multi-Agent架构模式

层次模式:一个主Agent负责任务分解,下发给多个子Agent执行。

对等模式:所有Agent平等协作,通过协商达成共识。

监督模式:一个监督Agent监控其他Agent的工作,决定是否需要干预。

市场模式:Agent之间像市场一样交易任务和能力。

LangChain
图1:LangChain架构

二、LangChain Agent开发

2.1 LangChain核心概念

from langchain.agents import Agent, Tool, initialize_agent
from langchain.agents.agent_types import AgentType
from langchain.chat_models import ChatOpenAI
from langchain.tools import BaseTool
from langchain.prompts import PromptTemplate
from typing import List, Optional

# 定义工具
class SearchTool(BaseTool):
    name = "web_search"
    description = "用于搜索互联网信息"
    
    def _run(self, query: str) -> str:
        # 实际实现中调用搜索API
        return f"搜索结果: {query}的相关信息..."

class CalculatorTool(BaseTool):
    name = "calculator"
    description = "用于数学计算"
    
    def _run(self, expression: str) -> str:
        try:
            result = eval(expression)
            return str(result)
        except Exception as e:
            return f"计算错误: {e}"

class WeatherTool(BaseTool):
    name = "weather"
    description = "查询天气信息"
    
    def _run(self, city: str) -> str:
        return f"{city}今天晴天,25度"

# 创建工具列表
tools = [SearchTool(), CalculatorTool(), WeatherTool()]

# 创建Agent
llm = ChatOpenAI(temperature=0)
agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 使用Agent
result = agent.run("北京今天的天气怎么样?请帮我计算一下明天和后天平均气温")
print(result)

2.2 ReAct模式的Agent

from langchain.agents import Agent, Tool
from langchain.agents.agent_types import AgentType
from langchain.prompts import PromptTemplate
from langchain.schema import AgentAction, AgentFinish

class ReActAgent:
    """ReAct模式的Agent: Reasoning + Acting"""
    
    def __init__(self, llm, tools, max_iterations=10):
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.max_iterations = max_iterations
        
        self.prompt = PromptTemplate.from_template("""
你是一个AI助手,需要解决用户的问题。

你的推理过程:
{agent_scratchpad}

Question: {input}
{agent_scratchpad}

当前状态:
{observation}

下一步行动:
""")
    
    def run(self, query: str) -> str:
        """运行Agent"""
        observation = ""
        agent_scratchpad = ""
        steps = []
        
        for i in range(self.max_iterations):
            # 构造提示
            prompt = self.prompt.format(
                input=query,
                observation=observation,
                agent_scratchpad=agent_scratchpad
            )
            
            # 调用LLM决定行动
            response = self.llm.predict(prompt)
            
            # 解析响应
            action, action_input = self._parse_response(response)
            
            if action == "Final":
                return action_input
            
            # 执行行动
            if action in self.tools:
                observation = self.tools[action].run(action_input)
            else:
                observation = f"未知工具: {action}"
            
            # 更新思考过程
            agent_scratchpad += f"\n思考: {response}\n"
            agent_scratchpad += f"行动: {action}({action_input})\n"
            agent_scratchpad += f"观察: {observation}\n"
        
        return "达到最大迭代次数"
    
    def _parse_response(self, response: str) -> tuple:
        """解析LLM响应,提取行动"""
        # 简化实现
        if "FINAL:" in response:
            return ("Final", response.split("FINAL:")[1].strip())
        
        # 假设格式: Action: tool_name\nAction Input: input
        lines = response.split("\n")
        action, action_input = "Unknown", ""
        
        for line in lines:
            if line.startswith("Action:"):
                action = line.replace("Action:", "").strip()
            elif line.startswith("Action Input:"):
                action_input = line.replace("Action Input:", "").strip()
        
        return (action, action_input)

# 使用
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature=0)

agent = ReActAgent(llm=llm, tools=[SearchTool(), CalculatorTool()])
result = agent.run("计算一下 123 * 456 + 789")
print(result)
AutoGPT
图2:自主Agent执行循环

三、AutoGPT与自主Agent

3.1 AutoGPT原理

AutoGPT的核心是自主任务执行循环:

  1. 目标分解:将大目标分解为可执行的小任务
  2. 任务执行:调用工具执行当前任务
  3. 结果评估:评估任务结果是否满足目标
  4. 自我反思:反思做得好和不好的地方
  5. 计划更新:根据结果更新后续计划

3.2 自主Agent实现

from typing import List, Dict
from dataclasses import dataclass, field
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    description: str
    status: TaskStatus = TaskStatus.PENDING
    result: str = ""
    dependencies: List[str] = field(default_factory=list)
    created_at: float = 0
    completed_at: float = 0

class AutonomousAgent:
    """自主Agent实现"""
    
    def __init__(self, name: str, llm, tools: List):
        self.name = name
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.tasks: Dict[str, Task] = {}
        self.completed_tasks: List[Task] = []
    
    def add_task(self, task: Task):
        """添加任务"""
        self.tasks[task.id] = task
    
    def execute_task_loop(self, max_iterations: int = 100):
        """自主执行循环"""
        iteration = 0
        
        while iteration < max_iterations:
            iteration += 1
            
            # 1. 找出一个可执行的任务
            ready_task = self._get_ready_task()
            
            if not ready_task:
                # 没有可执行的任务了
                if self._all_tasks_complete():
                    print(f"所有任务完成!共 {len(self.completed_tasks)} 个")
                    break
                else:
                    print("还有未完成的任务,但没有可执行的")
                    break
            
            print(f"[{iteration}] 执行任务: {ready_task.description}")
            
            # 2. 执行任务
            try:
                result = self._execute(ready_task)
                ready_task.result = result
                ready_task.status = TaskStatus.COMPLETED
                self.completed_tasks.append(ready_task)
                print(f"  完成: {result[:100]}...")
                
            except Exception as e:
                ready_task.status = TaskStatus.FAILED
                print(f"  失败: {e}")
    
    def _get_ready_task(self) -> Task:
        """获取可执行的任务"""
        for task in self.tasks.values():
            if task.status != TaskStatus.PENDING:
                continue
            
            # 检查依赖是否都完成
            deps_done = all(
                self.tasks.get(dep_id, Task(id=dep_id, description="")).status == TaskStatus.COMPLETED
                for dep_id in task.dependencies
            )
            
            if deps_done:
                task.status = TaskStatus.IN_PROGRESS
                return task
        
        return None
    
    def _execute(self, task: Task) -> str:
        """执行单个任务"""
        # 构造上下文
        context = self._build_context()
        
        prompt = f"""你是 {self.name},需要完成任务。

当前上下文:
{context}

任务:{task.description}

请执行任务,返回结果。如果需要使用工具,使用以下格式:
TOOL: tool_name
INPUT: tool_input
"""
        
        response = self.llm.predict(prompt)
        
        # 解析工具调用
        if "TOOL:" in response:
            tool_name = response.split("TOOL:")[1].split("\n")[0].strip()
            tool_input = response.split("INPUT:")[1].split("\n")[0].strip()
            
            if tool_name in self.tools:
                return self.tools[tool_name].run(tool_input)
        
        return response
    
    def _build_context(self) -> str:
        """构建执行上下文"""
        context = "已完成的任务:\n"
        for task in self.completed_tasks:
            context += f"- {task.description}: {task.result[:50]}...\n"
        return context
    
    def _all_tasks_complete(self) -> bool:
        """检查是否所有任务都完成"""
        return all(
            t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]
            for t in self.tasks.values()
        )

# 使用
agent = AutonomousAgent(
    name="研究助手",
    llm=ChatOpenAI(temperature=0),
    tools=[SearchTool(), CalculatorTool()]
)

# 添加任务
agent.add_task(Task(id="1", description="搜索最新AI研究进展"))
agent.add_task(Task(id="2", description="分析AI发展趋势", dependencies=["1"]))
agent.add_task(Task(id="3", description="总结关键发现", dependencies=["2"]))

agent.execute_task_loop()

四、Agent间协作机制

4.1 消息传递协议

Multi-Agent系统的核心是消息传递:

广播:一个Agent向所有Agent发送消息。

点对点:一个Agent向特定Agent发送消息。

发布-订阅:Agent订阅感兴趣的话题,发布者不需要知道订阅者。

4.2 协作模式实现

from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum
import time

class MessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    BROADCAST = "broadcast"
    HEARTBEAT = "heartbeat"

@dataclass
class Message:
    from_agent: str
    to_agent: str  # "*" 表示广播
    msg_type: MessageType
    content: Any
    timestamp: float = 0
    
    def __post_init__(self):
        if self.timestamp == 0:
            self.timestamp = time.time()

class AgentRegistry:
    """Agent注册中心"""
    
    def __init__(self):
        self.agents: Dict[str, Any] = {}
    
    def register(self, name: str, agent: Any):
        self.agents[name] = agent
    
    def get(self, name: str) -> Any:
        return self.agents.get(name)
    
    def list_agents(self) -> List[str]:
        return list(self.agents.keys())

class MessageBus:
    """消息总线 - Agent间通信的中介"""
    
    def __init__(self):
        self.registry = AgentRegistry()
        self.subscriptions: Dict[str, List[str]] = {}  # topic -> [agent_names]
        self.message_queue: List[Message] = []
    
    def register_agent(self, agent: Any):
        self.registry.register(agent.name, agent)
    
    def subscribe(self, topic: str, agent_name: str):
        if topic not in self.subscriptions:
            self.subscriptions[topic] = []
        if agent_name not in self.subscriptions[topic]:
            self.subscriptions[topic].append(agent_name)
    
    def send_message(self, msg: Message):
        """发送消息"""
        if msg.to_agent == "*":
            # 广播
            for agent_name in self.registry.list_agents():
                if agent_name != msg.from_agent:
                    agent = self.registry.get(agent_name)
                    if agent:
                        agent.receive_message(msg)
        else:
            # 点对点
            agent = self.registry.get(msg.to_agent)
            if agent:
                agent.receive_message(msg)
    
    def publish(self, topic: str, msg: Message):
        """发布-订阅"""
        for agent_name in self.subscriptions.get(topic, []):
            agent = self.registry.get(agent_name)
            if agent:
                agent.receive_message(msg)

class BaseAgent:
    """Agent基类"""
    
    def __init__(self, name: str, bus: MessageBus):
        self.name = name
        self.bus = bus
        bus.register_agent(self)
    
    def send_message(self, to: str, msg_type: MessageType, content: Any):
        msg = Message(from_agent=self.name, to_agent=to, msg_type=msg_type, content=content)
        self.bus.send_message(msg)
    
    def broadcast(self, msg_type: MessageType, content: Any):
        msg = Message(from_agent=self.name, to_agent="*", msg_type=msg_type, content=content)
        self.bus.send_message(msg)
    
    def receive_message(self, msg: Message):
        """处理收到的消息 - 子类实现"""
        raise NotImplementedError

# 具体Agent实现
class PlannerAgent(BaseAgent):
    """规划Agent"""
    
    def __init__(self, name: str, bus: MessageBus, llm):
        super().__init__(name, bus)
        self.llm = llm
    
    def receive_message(self, msg: Message):
        if msg.msg_type == MessageType.REQUEST:
            # 收到任务请求,进行规划
            task = msg.content
            plan = self.create_plan(task)
            
            # 向执行Agent发送计划
            self.send_message("executor", MessageType.REQUEST, plan)
    
    def create_plan(self, task: str) -> List[str]:
        prompt = f"将以下任务分解为步骤:{task}"
        response = self.llm.predict(prompt)
        # 简化:假设LLM返回逗号分隔的步骤
        return [s.strip() for s in response.split(",")]

class ExecutorAgent(BaseAgent):
    """执行Agent"""
    
    def __init__(self, name: str, bus: MessageBus, tools: List):
        super().__init__(name, bus)
        self.tools = {t.name: t for t in tools}
    
    def receive_message(self, msg: Message):
        if msg.msg_type == MessageType.REQUEST:
            plan = msg.content
            results = []
            
            for step in plan:
                # 执行每个步骤
                result = self.execute_step(step)
                results.append({"step": step, "result": result})
                
                # 广播进度
                self.broadcast(MessageType.BROADCAST, {"progress": len(results), "total": len(plan)})
            
            # 向规划Agent报告完成
            self.send_message("planner", MessageType.RESPONSE, results)
    
    def execute_step(self, step: str) -> str:
        # 简化实现
        return f"完成: {step}"

# 使用
bus = MessageBus()
planner = PlannerAgent("planner", bus, ChatOpenAI(temperature=0))
executor = ExecutorAgent("executor", bus, [SearchTool()])

# 发送任务
planner.send_message("planner", MessageType.REQUEST, "帮我研究一下最新的AI技术趋势")
实战
图3:Multi-Agent协作

五、Multi-Agent实战项目

5.1 项目:自动化研究助手

需求:构建一个自动化研究助手,可以自主搜索、阅读、总结AI领域的研究进展。

架构设计:

协调Agent:负责任务分解、进度跟踪、结果整合。

搜索Agent:负责搜索相关论文和信息。

阅读Agent:负责阅读和理解论文内容。

写作Agent:负责将研究结果整理成报告。

5.2 完整实现

import asyncio
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class ResearchTask:
    topic: str
    max_sources: int = 10
    output_format: str = "报告"

class CoordinatorAgent(BaseAgent):
    """协调Agent - 管理整个研究流程"""
    
    def __init__(self, name: str, bus: MessageBus, llm):
        super().__init__(name, bus)
        self.llm = llm
        self.pending_results: Dict[str, List] = {}
    
    async def run(self, task: ResearchTask):
        """运行研究任务"""
        # 1. 分解任务
        subtasks = await self.decompose_task(task)
        
        # 2. 分发给专业Agent
        for subtask in subtasks:
            if subtask["type"] == "search":
                self.send_message("searcher", MessageType.REQUEST, subtask)
            elif subtask["type"] == "read":
                self.send_message("reader", MessageType.REQUEST, subtask)
        
        # 3. 等待结果整合
        # ...
    
    async def decompose_task(self, task: ResearchTask) -> List[Dict]:
        prompt = f"""分解以下研究任务为子任务:
任务:{task.topic}
要求:
- 搜索相关论文
- 阅读理解关键内容
- 整理总结

请用JSON格式返回子任务列表。"""
        
        response = self.llm.predict(prompt)
        # 解析返回的子任务
        return [
            {"type": "search", "query": task.topic, "max_results": task.max_sources},
            {"type": "read", "papers": ["paper1", "paper2"]},
        ]

class SearchAgent(BaseAgent):
    """搜索Agent"""
    
    def __init__(self, name: str, bus: MessageBus, search_api):
        super().__init__(name, bus)
        self.search_api = search_api
    
    async def search(self, query: str, max_results: int) -> List[Dict]:
        # 调用搜索API
        results = await self.search_api.search(query, max_results)
        return [
            {
                "title": r["title"],
                "url": r["url"],
                "abstract": r.get("abstract", ""),
                "date": r.get("date", "")
            }
            for r in results
        ]

class ReaderAgent(BaseAgent):
    """阅读Agent"""
    
    def __init__(self, name: str, bus: MessageBus, llm):
        super().__init__(name, bus)
        self.llm = llm
    
    async def read_paper(self, paper: Dict) -> Dict:
        # 简化实现
        return {
            "title": paper["title"],
            "summary": f"这是{paper['title']}的总结...",
            "key_findings": ["发现1", "发现2"],
            "limitations": ["局限1"]
        }

class WriterAgent(BaseAgent):
    """写作Agent"""
    
    def __init__(self, name: str, bus: MessageBus, llm):
        super().__init__(name, bus)
        self.llm = llm
    
    async def write_report(self, findings: List[Dict], topic: str) -> str:
        prompt = f"""根据以下研究发现,写一份关于{topic}的研究报告:

{findings}

报告要求:
- 结构清晰
- 包含关键发现
- 有深度分析
- 指出局限性
"""
        return self.llm.predict(prompt)

# 异步运行
async def main():
    from langchain.chat_models import ChatOpenAI
    llm = ChatOpenAI(temperature=0)
    
    bus = MessageBus()
    coordinator = CoordinatorAgent("coordinator", bus, llm)
    
    task = ResearchTask(topic="LLM最新研究进展", max_sources=10)
    await coordinator.run(task)

# asyncio.run(main())

六、框架对比与选型

6.1 主流框架对比

框架 特点 适用场景
LangChain 功能丰富,生态完善 复杂RAG、工具调用
AutoGPT 自主性强,任务分解 需要自主探索的任务
MetaGPT 多角色协作,SOP驱动 软件开发、产品设计
CrewAI 角色定义清晰,易用 快速构建Agent团队

6.2 选型建议

简单任务:使用LangChain Agent,配合ReAct模式。

复杂探索:使用AutoGPT或类似的自助Agent框架。

团队协作:使用MetaGPT或CrewAI,定义清晰的Agent角色。

生产系统:基于LangChain构建,添加监控和错误处理。

七、总结

Multi-Agent是AI系统的未来。单Agent能力有限,多Agent协作能解决更复杂的问题。

框架选择要适配场景。不是越复杂的框架越好,要根据具体需求选择。

Agent间协作机制是核心。消息协议、任务分解、结果整合都需要精心设计。

实践出真知。多动手实验,才能真正掌握Multi-Agent的开发。

延伸阅读

  • LangChain官方文档
  • AutoGPT GitHub
  • MetaGPT论文
  • CrewAI文档

课后练习

基础题:使用LangChain构建一个有工具调用能力的Agent。

进阶题:用Multi-Agent模式实现一个简单的代码审查系统。

挑战题:构建一个自动化研究助手,可以自主完成一个主题的研究报告。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

微信公众号二维码

扫码关注公众号

QQ
QQ二维码

扫码添加QQ