核心概念
多智能体反思
多智能体反思系统的原理和配置
多智能体反思
多智能体反思是MRRA中的核心机制,使用多个专业化子智能体协作解决移动性预测问题。每个子智能体带来不同的专业知识和视角,然后它们的输出被聚合以获得稳健、可解释的结果。
核心概念
多智能体反思系统通过以下步骤工作:
- 多个子智能体:每个专业化于不同方面(时间、空间、路线、目的驱动)
- 结构化响应:每个子智能体提供
selection/path_ids、confidence和rationale - 聚合:聚合器使用各种策略组合响应
- 工具集成:子智能体可以通过MCP协议访问外部工具
关键优势:
- 稳健性:多种视角减少单点故障
- 可解释性:每个子智能体提供推理理由
- 可扩展性:易于添加新的专业智能体或工具
基本配置
简单的双智能体设置
from mrra.agents.builder import build_mrra_agent
# 基本反思配置
reflection_cfg = dict(
max_round=1,
subAgents=[
{
"name": "temporal",
"prompt": "从选项中选择最可能的位置id(选择),不要输出坐标。"
},
{
"name": "spatial",
"prompt": "从选项中选择最可能的位置id(选择),不要输出坐标。"
},
],
aggregator="confidence_weighted_voting",
)
# 使用反思构建智能体
agent = build_mrra_agent(
llm=llm_cfg,
retriever=retriever,
reflection=reflection_cfg
)
# 执行预测
result = agent.invoke({
"task": "next_position",
"user_id": "user_123",
"t": "2024-09-10 12:30:00"
})高级多智能体配置
# 带专业智能体的高级配置
advanced_reflection_cfg = dict(
max_round=2, # 允许多轮反思
subAgents=[
{
"name": "temporal_expert",
"prompt": """你是一个时间移动性专家。分析时间模式并基于以下内容选择最可能的位置:
- 一天中的时间偏好
- 星期几模式
- 历史时间模式
从选项中选择(选择)并提供置信度得分。""",
"temperature": 0.1, # 低温度保持一致性
},
{
"name": "spatial_expert",
"prompt": """你是一个空间移动性专家。分析位置模式并基于以下内容选择最可能的位置:
- 空间聚类和热点
- 距离和可达性
- 地理上下文
从选项中选择(选择)并提供置信度得分。""",
"temperature": 0.1,
},
{
"name": "purpose_expert",
"prompt": """你是一个目的驱动的移动性专家。分析活动目的并基于以下内容选择最可能的位置:
- 活动目的模式
- 序列活动逻辑
- 上下文适合的位置
从选项中选择(选择)并提供置信度得分。""",
"temperature": 0.2,
},
{
"name": "routing_expert",
"prompt": """你是一个路线和转换专家。分析移动模式并基于以下内容选择最可能的位置:
- 常见转换路线
- 移动效率
- 序列位置逻辑
从选项中选择(选择)并提供置信度得分。""",
"temperature": 0.15,
}
],
aggregator="confidence_weighted_voting",
consensus_threshold=0.7, # 最终决策需要高共识
)子智能体专业化
时间子智能体
专注于基于时间的模式和偏好:
{
"name": "hourly_expert",
"prompt": """分析每小时移动模式:
- 峰值活动时间(7-9am, 12-2pm, 5-7pm)
- 非峰值行为模式
- 午餐时间(11am-2pm)
- 晚间例行公事(6-10pm)
考虑当前时间上下文并选择最适合时间的位置。"""
}{
"name": "weekly_expert",
"prompt": """分析周移动模式:
- 工作日 vs 周末行为
- 周一早上模式
- 周五晚上模式
- 周末休闲活动
考虑星期几上下文并相应选择。"""
}{
"name": "seasonal_expert",
"prompt": """分析季节和日历模式:
- 天气对移动性的影响
- 节假日和特殊事件模式
- 季节活动偏好
- 学术日历影响
在你的选择中考虑季节上下文。"""
}空间子智能体
专注于地理和基于位置的分析:
spatial_agents = [
{
"name": "clustering_expert",
"prompt": """分析空间聚类模式:
- 识别活动热点和集群
- 考虑空间密度和可达性
- 评估距离关系
- 评估地理约束"""
},
{
"name": "poi_expert",
"prompt": """分析兴趣点模式:
- 考虑POI类别和目的
- 评估营业时间和可用性
- 评估POI受欢迎度和容量
- 考虑服务可达性"""
},
{
"name": "network_expert",
"prompt": """分析交通网络因素:
- 考虑路线可达性
- 评估旅行时间和距离
- 评估交通方式偏好
- 考虑网络连通性"""
}
]目的驱动的子智能体
专注于活动目的和上下文逻辑:
purpose_agents = [
{
"name": "activity_sequence_expert",
"prompt": """分析活动序列逻辑:
- 常见活动转换(工作→用餐→工作→家)
- 目的驱动的位置选择
- 活动持续时间期望
- 序列一致性"""
},
{
"name": "lifestyle_expert",
"prompt": """分析生活方式和行为模式:
- 个人偏好和习惯
- 常规 vs 自发行为
- 社交和文化因素
- 个体移动性风格"""
}
]聚合策略
置信度加权投票
默认的聚合策略,按置信度加权每个子智能体的投票:
def confidence_weighted_voting(subagent_responses):
"""使用置信度加权投票聚合响应"""
location_scores = {}
total_confidence = 0
for response in subagent_responses:
selection = response.get('selection')
confidence = response.get('confidence', 0.5)
if selection:
location_scores[selection] = location_scores.get(selection, 0) + confidence
total_confidence += confidence
if not location_scores:
return None
# 标准化得分
for location in location_scores:
location_scores[location] /= total_confidence
# 选择最高得分位置
best_location = max(location_scores.items(), key=lambda x: x[1])
return {
'selection': best_location[0],
'confidence': best_location[1],
'scores': location_scores,
'rationale': f"由{len(location_scores)}个智能体选择,平均置信度{best_location[1]:.3f}"
}自定义聚合策略
class ConsensusAggregator:
def __init__(self, consensus_threshold=0.6):
self.consensus_threshold = consensus_threshold
def aggregate(self, subagent_responses):
"""需要高于阈值的共识"""
selections = [r.get('selection') for r in subagent_responses if r.get('selection')]
if not selections:
return None
# 计数选择
selection_counts = {}
for selection in selections:
selection_counts[selection] = selection_counts.get(selection, 0) + 1
# 检查共识
total_agents = len(subagent_responses)
for selection, count in selection_counts.items():
if count / total_agents >= self.consensus_threshold:
avg_confidence = sum(
r['confidence'] for r in subagent_responses
if r.get('selection') == selection
) / count
return {
'selection': selection,
'confidence': avg_confidence,
'consensus_ratio': count / total_agents,
'rationale': f"由{count}/{total_agents}个智能体达成共识"
}
# 未达成共识
return {
'selection': max(selection_counts.items(), key=lambda x: x[1])[0],
'confidence': 0.3, # 缺乏共识的低置信度
'consensus_ratio': max(selection_counts.values()) / total_agents,
'rationale': "没有强共识,选择最受欢迎的选项"
}通过MCP的工具集成
子智能体可以通过MCP(模型上下文协议)框架访问外部工具:
天气工具集成
reflection_cfg_with_weather = dict(
max_round=1,
subAgents=[
{
"name": "weather_aware_agent",
"prompt": """在位置选择中考虑天气条件:
- 室内 vs 室外活动偏好
- 天气适宜的位置
- 季节活动调整
从选项中选择并解释天气影响。""",
"mcp": {
"weather": {} # 启用天气工具
}
},
{
"name": "baseline_agent",
"prompt": "基于历史模式选择,不考虑外部上下文。"
}
],
aggregator="confidence_weighted_voting"
)地图和POI集成
reflection_cfg_with_maps = dict(
max_round=1,
subAgents=[
{
"name": "poi_aware_agent",
"prompt": """使用附近POI信息来告知位置选择:
- 营业时间和可用性
- POI类别和服务
- 热门目的地
使用POI上下文从选项中选择。""",
"mcp": {
"maps": {}, # 启用POI查找工具
"gmap": { # 启用远程地图服务
"url": "https://your-mcp-server/sse",
"transport": "sse"
}
}
}
],
aggregator="confidence_weighted_voting"
)高级反思模式
多轮反思
为复杂决策启用多轮反思:
multi_round_cfg = dict(
max_round=3,
subAgents=[
{
"name": "initial_analyzer",
"prompt": "提供初始分析和广泛的位置候选。"
},
{
"name": "detail_evaluator",
"prompt": "详细评估初始建议并精化选择。"
},
{
"name": "final_validator",
"prompt": "根据所有可用证据验证最终选择。"
}
],
aggregator="consensus_with_validation",
round_feedback=True # 允许智能体看到前一轮结果
)性能和优化
并行子智能体执行
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def execute_subagents_parallel(subagents, query, llm):
"""并行执行子智能体以获得更好的性能"""
async def execute_single_agent(agent_config):
# 执行单个子智能体
result = await llm.ainvoke(agent_config['prompt'] + f"\n查询: {query}")
return {
'agent': agent_config['name'],
'result': result,
'timestamp': datetime.now().isoformat()
}
# 并发执行所有子智能体
tasks = [execute_single_agent(agent) for agent in subagents]
results = await asyncio.gather(*tasks)
return results性能考虑:
- 并行执行可以显著加速多智能体系统
- 为重复的相似查询缓存子智能体结果
- 考虑子智能体复杂性 vs 准确性权衡
- 监控多智能体LLM API成本
调试和监控
子智能体响应分析
def analyze_subagent_responses(responses):
"""分析子智能体响应模式"""
analysis = {
'response_count': len(responses),
'confidence_stats': {},
'selection_distribution': {},
'reasoning_themes': []
}
# 置信度统计
confidences = [r.get('confidence', 0) for r in responses]
analysis['confidence_stats'] = {
'mean': sum(confidences) / len(confidences) if confidences else 0,
'min': min(confidences) if confidences else 0,
'max': max(confidences) if confidences else 0,
'std': np.std(confidences) if confidences else 0
}
# 选择分布
selections = [r.get('selection') for r in responses if r.get('selection')]
for selection in selections:
analysis['selection_distribution'][selection] = analysis['selection_distribution'].get(selection, 0) + 1
# 推理分析(简化)
rationales = [r.get('rationale', '') for r in responses]
common_terms = {}
for rationale in rationales:
words = rationale.lower().split()
for word in words:
if len(word) > 3: # 过滤短词
common_terms[word] = common_terms.get(word, 0) + 1
analysis['reasoning_themes'] = sorted(common_terms.items(), key=lambda x: x[1], reverse=True)[:10]
return analysis