【高级应用】Day25:Multi-Agent系统开发–多Agent协作框架实战
章节导语
单Agent的能力有限,但当多个Agent协作时,能解决的问题边界会急剧扩大。
想象一下:一个Agent负责规划,一个负责执行,一个负责验证,一个负责记忆……它们各司其职、相互协作,像一个高效的团队一样工作。这就是Multi-Agent系统的魅力。
本文系统讲解Multi-Agent系统的开发框架与实战,包括LangChain、AutoGPT、MetaGPT等主流框架的原理和使用,以及如何设计Agent间的协作机制、消息协议、任务分解与整合等核心技能。
一、从单Agent到Multi-Agent
1.1 单Agent的局限
单Agent系统的局限:
能力边界:一个Agent无法精通所有技能,总有知识盲区。
专注力分散:复杂任务需要多种能力,单Agent难以同时保持多任务的高质量。
错误难以纠正:如果推理过程出错,很难从中间步骤恢复。
无法并行:多个子任务必须串行执行,效率低下。
1.2 Multi-Agent的优势
Multi-Agent系统解决这些问题:
专业化分工:每个Agent专注于自己的领域,做到极致。
并行处理:相互独立的子任务可以同时执行,效率倍增。
互相校验:一个Agent的工作可以由另一个Agent检查。
emergent Behavior:多个Agent交互会产生单个Agent没有的涌现行为。
1.3 Multi-Agent架构模式
层次模式:一个主Agent负责任务分解,下发给多个子Agent执行。
对等模式:所有Agent平等协作,通过协商达成共识。
监督模式:一个监督Agent监控其他Agent的工作,决定是否需要干预。
市场模式:Agent之间像市场一样交易任务和能力。

二、LangChain Agent开发
2.1 LangChain核心概念
from langchain.agents import Agent, Tool, initialize_agent
from langchain.agents.agent_types import AgentType
from langchain.chat_models import ChatOpenAI
from langchain.tools import BaseTool
from langchain.prompts import PromptTemplate
from typing import List, Optional
# 定义工具
class SearchTool(BaseTool):
name = "web_search"
description = "用于搜索互联网信息"
def _run(self, query: str) -> str:
# 实际实现中调用搜索API
return f"搜索结果: {query}的相关信息..."
class CalculatorTool(BaseTool):
name = "calculator"
description = "用于数学计算"
def _run(self, expression: str) -> str:
try:
result = eval(expression)
return str(result)
except Exception as e:
return f"计算错误: {e}"
class WeatherTool(BaseTool):
name = "weather"
description = "查询天气信息"
def _run(self, city: str) -> str:
return f"{city}今天晴天,25度"
# 创建工具列表
tools = [SearchTool(), CalculatorTool(), WeatherTool()]
# 创建Agent
llm = ChatOpenAI(temperature=0)
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 使用Agent
result = agent.run("北京今天的天气怎么样?请帮我计算一下明天和后天平均气温")
print(result)
2.2 ReAct模式的Agent
from langchain.agents import Agent, Tool
from langchain.agents.agent_types import AgentType
from langchain.prompts import PromptTemplate
from langchain.schema import AgentAction, AgentFinish
class ReActAgent:
"""ReAct模式的Agent: Reasoning + Acting"""
def __init__(self, llm, tools, max_iterations=10):
self.llm = llm
self.tools = {t.name: t for t in tools}
self.max_iterations = max_iterations
self.prompt = PromptTemplate.from_template("""
你是一个AI助手,需要解决用户的问题。
你的推理过程:
{agent_scratchpad}
Question: {input}
{agent_scratchpad}
当前状态:
{observation}
下一步行动:
""")
def run(self, query: str) -> str:
"""运行Agent"""
observation = ""
agent_scratchpad = ""
steps = []
for i in range(self.max_iterations):
# 构造提示
prompt = self.prompt.format(
input=query,
observation=observation,
agent_scratchpad=agent_scratchpad
)
# 调用LLM决定行动
response = self.llm.predict(prompt)
# 解析响应
action, action_input = self._parse_response(response)
if action == "Final":
return action_input
# 执行行动
if action in self.tools:
observation = self.tools[action].run(action_input)
else:
observation = f"未知工具: {action}"
# 更新思考过程
agent_scratchpad += f"\n思考: {response}\n"
agent_scratchpad += f"行动: {action}({action_input})\n"
agent_scratchpad += f"观察: {observation}\n"
return "达到最大迭代次数"
def _parse_response(self, response: str) -> tuple:
"""解析LLM响应,提取行动"""
# 简化实现
if "FINAL:" in response:
return ("Final", response.split("FINAL:")[1].strip())
# 假设格式: Action: tool_name\nAction Input: input
lines = response.split("\n")
action, action_input = "Unknown", ""
for line in lines:
if line.startswith("Action:"):
action = line.replace("Action:", "").strip()
elif line.startswith("Action Input:"):
action_input = line.replace("Action Input:", "").strip()
return (action, action_input)
# 使用
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature=0)
agent = ReActAgent(llm=llm, tools=[SearchTool(), CalculatorTool()])
result = agent.run("计算一下 123 * 456 + 789")
print(result)

三、AutoGPT与自主Agent
3.1 AutoGPT原理
AutoGPT的核心是自主任务执行循环:
- 目标分解:将大目标分解为可执行的小任务
- 任务执行:调用工具执行当前任务
- 结果评估:评估任务结果是否满足目标
- 自我反思:反思做得好和不好的地方
- 计划更新:根据结果更新后续计划
3.2 自主Agent实现
from typing import List, Dict
from dataclasses import dataclass, field
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
id: str
description: str
status: TaskStatus = TaskStatus.PENDING
result: str = ""
dependencies: List[str] = field(default_factory=list)
created_at: float = 0
completed_at: float = 0
class AutonomousAgent:
"""自主Agent实现"""
def __init__(self, name: str, llm, tools: List):
self.name = name
self.llm = llm
self.tools = {t.name: t for t in tools}
self.tasks: Dict[str, Task] = {}
self.completed_tasks: List[Task] = []
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
def execute_task_loop(self, max_iterations: int = 100):
"""自主执行循环"""
iteration = 0
while iteration < max_iterations:
iteration += 1
# 1. 找出一个可执行的任务
ready_task = self._get_ready_task()
if not ready_task:
# 没有可执行的任务了
if self._all_tasks_complete():
print(f"所有任务完成!共 {len(self.completed_tasks)} 个")
break
else:
print("还有未完成的任务,但没有可执行的")
break
print(f"[{iteration}] 执行任务: {ready_task.description}")
# 2. 执行任务
try:
result = self._execute(ready_task)
ready_task.result = result
ready_task.status = TaskStatus.COMPLETED
self.completed_tasks.append(ready_task)
print(f" 完成: {result[:100]}...")
except Exception as e:
ready_task.status = TaskStatus.FAILED
print(f" 失败: {e}")
def _get_ready_task(self) -> Task:
"""获取可执行的任务"""
for task in self.tasks.values():
if task.status != TaskStatus.PENDING:
continue
# 检查依赖是否都完成
deps_done = all(
self.tasks.get(dep_id, Task(id=dep_id, description="")).status == TaskStatus.COMPLETED
for dep_id in task.dependencies
)
if deps_done:
task.status = TaskStatus.IN_PROGRESS
return task
return None
def _execute(self, task: Task) -> str:
"""执行单个任务"""
# 构造上下文
context = self._build_context()
prompt = f"""你是 {self.name},需要完成任务。
当前上下文:
{context}
任务:{task.description}
请执行任务,返回结果。如果需要使用工具,使用以下格式:
TOOL: tool_name
INPUT: tool_input
"""
response = self.llm.predict(prompt)
# 解析工具调用
if "TOOL:" in response:
tool_name = response.split("TOOL:")[1].split("\n")[0].strip()
tool_input = response.split("INPUT:")[1].split("\n")[0].strip()
if tool_name in self.tools:
return self.tools[tool_name].run(tool_input)
return response
def _build_context(self) -> str:
"""构建执行上下文"""
context = "已完成的任务:\n"
for task in self.completed_tasks:
context += f"- {task.description}: {task.result[:50]}...\n"
return context
def _all_tasks_complete(self) -> bool:
"""检查是否所有任务都完成"""
return all(
t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]
for t in self.tasks.values()
)
# 使用
agent = AutonomousAgent(
name="研究助手",
llm=ChatOpenAI(temperature=0),
tools=[SearchTool(), CalculatorTool()]
)
# 添加任务
agent.add_task(Task(id="1", description="搜索最新AI研究进展"))
agent.add_task(Task(id="2", description="分析AI发展趋势", dependencies=["1"]))
agent.add_task(Task(id="3", description="总结关键发现", dependencies=["2"]))
agent.execute_task_loop()
四、Agent间协作机制
4.1 消息传递协议
Multi-Agent系统的核心是消息传递:
广播:一个Agent向所有Agent发送消息。
点对点:一个Agent向特定Agent发送消息。
发布-订阅:Agent订阅感兴趣的话题,发布者不需要知道订阅者。
4.2 协作模式实现
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum
import time
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
BROADCAST = "broadcast"
HEARTBEAT = "heartbeat"
@dataclass
class Message:
from_agent: str
to_agent: str # "*" 表示广播
msg_type: MessageType
content: Any
timestamp: float = 0
def __post_init__(self):
if self.timestamp == 0:
self.timestamp = time.time()
class AgentRegistry:
"""Agent注册中心"""
def __init__(self):
self.agents: Dict[str, Any] = {}
def register(self, name: str, agent: Any):
self.agents[name] = agent
def get(self, name: str) -> Any:
return self.agents.get(name)
def list_agents(self) -> List[str]:
return list(self.agents.keys())
class MessageBus:
"""消息总线 - Agent间通信的中介"""
def __init__(self):
self.registry = AgentRegistry()
self.subscriptions: Dict[str, List[str]] = {} # topic -> [agent_names]
self.message_queue: List[Message] = []
def register_agent(self, agent: Any):
self.registry.register(agent.name, agent)
def subscribe(self, topic: str, agent_name: str):
if topic not in self.subscriptions:
self.subscriptions[topic] = []
if agent_name not in self.subscriptions[topic]:
self.subscriptions[topic].append(agent_name)
def send_message(self, msg: Message):
"""发送消息"""
if msg.to_agent == "*":
# 广播
for agent_name in self.registry.list_agents():
if agent_name != msg.from_agent:
agent = self.registry.get(agent_name)
if agent:
agent.receive_message(msg)
else:
# 点对点
agent = self.registry.get(msg.to_agent)
if agent:
agent.receive_message(msg)
def publish(self, topic: str, msg: Message):
"""发布-订阅"""
for agent_name in self.subscriptions.get(topic, []):
agent = self.registry.get(agent_name)
if agent:
agent.receive_message(msg)
class BaseAgent:
"""Agent基类"""
def __init__(self, name: str, bus: MessageBus):
self.name = name
self.bus = bus
bus.register_agent(self)
def send_message(self, to: str, msg_type: MessageType, content: Any):
msg = Message(from_agent=self.name, to_agent=to, msg_type=msg_type, content=content)
self.bus.send_message(msg)
def broadcast(self, msg_type: MessageType, content: Any):
msg = Message(from_agent=self.name, to_agent="*", msg_type=msg_type, content=content)
self.bus.send_message(msg)
def receive_message(self, msg: Message):
"""处理收到的消息 - 子类实现"""
raise NotImplementedError
# 具体Agent实现
class PlannerAgent(BaseAgent):
"""规划Agent"""
def __init__(self, name: str, bus: MessageBus, llm):
super().__init__(name, bus)
self.llm = llm
def receive_message(self, msg: Message):
if msg.msg_type == MessageType.REQUEST:
# 收到任务请求,进行规划
task = msg.content
plan = self.create_plan(task)
# 向执行Agent发送计划
self.send_message("executor", MessageType.REQUEST, plan)
def create_plan(self, task: str) -> List[str]:
prompt = f"将以下任务分解为步骤:{task}"
response = self.llm.predict(prompt)
# 简化:假设LLM返回逗号分隔的步骤
return [s.strip() for s in response.split(",")]
class ExecutorAgent(BaseAgent):
"""执行Agent"""
def __init__(self, name: str, bus: MessageBus, tools: List):
super().__init__(name, bus)
self.tools = {t.name: t for t in tools}
def receive_message(self, msg: Message):
if msg.msg_type == MessageType.REQUEST:
plan = msg.content
results = []
for step in plan:
# 执行每个步骤
result = self.execute_step(step)
results.append({"step": step, "result": result})
# 广播进度
self.broadcast(MessageType.BROADCAST, {"progress": len(results), "total": len(plan)})
# 向规划Agent报告完成
self.send_message("planner", MessageType.RESPONSE, results)
def execute_step(self, step: str) -> str:
# 简化实现
return f"完成: {step}"
# 使用
bus = MessageBus()
planner = PlannerAgent("planner", bus, ChatOpenAI(temperature=0))
executor = ExecutorAgent("executor", bus, [SearchTool()])
# 发送任务
planner.send_message("planner", MessageType.REQUEST, "帮我研究一下最新的AI技术趋势")

五、Multi-Agent实战项目
5.1 项目:自动化研究助手
需求:构建一个自动化研究助手,可以自主搜索、阅读、总结AI领域的研究进展。
架构设计:
协调Agent:负责任务分解、进度跟踪、结果整合。
搜索Agent:负责搜索相关论文和信息。
阅读Agent:负责阅读和理解论文内容。
写作Agent:负责将研究结果整理成报告。
5.2 完整实现
import asyncio
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class ResearchTask:
topic: str
max_sources: int = 10
output_format: str = "报告"
class CoordinatorAgent(BaseAgent):
"""协调Agent - 管理整个研究流程"""
def __init__(self, name: str, bus: MessageBus, llm):
super().__init__(name, bus)
self.llm = llm
self.pending_results: Dict[str, List] = {}
async def run(self, task: ResearchTask):
"""运行研究任务"""
# 1. 分解任务
subtasks = await self.decompose_task(task)
# 2. 分发给专业Agent
for subtask in subtasks:
if subtask["type"] == "search":
self.send_message("searcher", MessageType.REQUEST, subtask)
elif subtask["type"] == "read":
self.send_message("reader", MessageType.REQUEST, subtask)
# 3. 等待结果整合
# ...
async def decompose_task(self, task: ResearchTask) -> List[Dict]:
prompt = f"""分解以下研究任务为子任务:
任务:{task.topic}
要求:
- 搜索相关论文
- 阅读理解关键内容
- 整理总结
请用JSON格式返回子任务列表。"""
response = self.llm.predict(prompt)
# 解析返回的子任务
return [
{"type": "search", "query": task.topic, "max_results": task.max_sources},
{"type": "read", "papers": ["paper1", "paper2"]},
]
class SearchAgent(BaseAgent):
"""搜索Agent"""
def __init__(self, name: str, bus: MessageBus, search_api):
super().__init__(name, bus)
self.search_api = search_api
async def search(self, query: str, max_results: int) -> List[Dict]:
# 调用搜索API
results = await self.search_api.search(query, max_results)
return [
{
"title": r["title"],
"url": r["url"],
"abstract": r.get("abstract", ""),
"date": r.get("date", "")
}
for r in results
]
class ReaderAgent(BaseAgent):
"""阅读Agent"""
def __init__(self, name: str, bus: MessageBus, llm):
super().__init__(name, bus)
self.llm = llm
async def read_paper(self, paper: Dict) -> Dict:
# 简化实现
return {
"title": paper["title"],
"summary": f"这是{paper['title']}的总结...",
"key_findings": ["发现1", "发现2"],
"limitations": ["局限1"]
}
class WriterAgent(BaseAgent):
"""写作Agent"""
def __init__(self, name: str, bus: MessageBus, llm):
super().__init__(name, bus)
self.llm = llm
async def write_report(self, findings: List[Dict], topic: str) -> str:
prompt = f"""根据以下研究发现,写一份关于{topic}的研究报告:
{findings}
报告要求:
- 结构清晰
- 包含关键发现
- 有深度分析
- 指出局限性
"""
return self.llm.predict(prompt)
# 异步运行
async def main():
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature=0)
bus = MessageBus()
coordinator = CoordinatorAgent("coordinator", bus, llm)
task = ResearchTask(topic="LLM最新研究进展", max_sources=10)
await coordinator.run(task)
# asyncio.run(main())
六、框架对比与选型
6.1 主流框架对比
| 框架 | 特点 | 适用场景 |
|---|---|---|
| LangChain | 功能丰富,生态完善 | 复杂RAG、工具调用 |
| AutoGPT | 自主性强,任务分解 | 需要自主探索的任务 |
| MetaGPT | 多角色协作,SOP驱动 | 软件开发、产品设计 |
| CrewAI | 角色定义清晰,易用 | 快速构建Agent团队 |
6.2 选型建议
简单任务:使用LangChain Agent,配合ReAct模式。
复杂探索:使用AutoGPT或类似的自助Agent框架。
团队协作:使用MetaGPT或CrewAI,定义清晰的Agent角色。
生产系统:基于LangChain构建,添加监控和错误处理。
七、总结
Multi-Agent是AI系统的未来。单Agent能力有限,多Agent协作能解决更复杂的问题。
框架选择要适配场景。不是越复杂的框架越好,要根据具体需求选择。
Agent间协作机制是核心。消息协议、任务分解、结果整合都需要精心设计。
实践出真知。多动手实验,才能真正掌握Multi-Agent的开发。
延伸阅读
- LangChain官方文档
- AutoGPT GitHub
- MetaGPT论文
- CrewAI文档
课后练习
基础题:使用LangChain构建一个有工具调用能力的Agent。
进阶题:用Multi-Agent模式实现一个简单的代码审查系统。
挑战题:构建一个自动化研究助手,可以自主完成一个主题的研究报告。