MRRA LogoMRRA
核心概念

多智能体反思

多智能体反思系统的原理和配置

多智能体反思

多智能体反思是MRRA中的核心机制,使用多个专业化子智能体协作解决移动性预测问题。每个子智能体带来不同的专业知识和视角,然后它们的输出被聚合以获得稳健、可解释的结果。

核心概念

多智能体反思系统通过以下步骤工作:

  1. 多个子智能体:每个专业化于不同方面(时间、空间、路线、目的驱动)
  2. 结构化响应:每个子智能体提供selection/path_idsconfidencerationale
  3. 聚合:聚合器使用各种策略组合响应
  4. 工具集成:子智能体可以通过MCP协议访问外部工具

关键优势:

  • 稳健性:多种视角减少单点故障
  • 可解释性:每个子智能体提供推理理由
  • 可扩展性:易于添加新的专业智能体或工具

基本配置

简单的双智能体设置

from mrra.agents.builder import build_mrra_agent

# 基本反思配置
reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "temporal", 
            "prompt": "从选项中选择最可能的位置id(选择),不要输出坐标。"
        },
        {
            "name": "spatial",  
            "prompt": "从选项中选择最可能的位置id(选择),不要输出坐标。"
        },
    ],
    aggregator="confidence_weighted_voting",
)

# 使用反思构建智能体
agent = build_mrra_agent(
    llm=llm_cfg, 
    retriever=retriever, 
    reflection=reflection_cfg
)

# 执行预测
result = agent.invoke({
    "task": "next_position", 
    "user_id": "user_123", 
    "t": "2024-09-10 12:30:00"
})

高级多智能体配置

# 带专业智能体的高级配置
advanced_reflection_cfg = dict(
    max_round=2,  # 允许多轮反思
    subAgents=[
        {
            "name": "temporal_expert",
            "prompt": """你是一个时间移动性专家。分析时间模式并基于以下内容选择最可能的位置:
            - 一天中的时间偏好
            - 星期几模式
            - 历史时间模式
            从选项中选择(选择)并提供置信度得分。""",
            "temperature": 0.1,  # 低温度保持一致性
        },
        {
            "name": "spatial_expert", 
            "prompt": """你是一个空间移动性专家。分析位置模式并基于以下内容选择最可能的位置:
            - 空间聚类和热点
            - 距离和可达性
            - 地理上下文
            从选项中选择(选择)并提供置信度得分。""",
            "temperature": 0.1,
        },
        {
            "name": "purpose_expert",
            "prompt": """你是一个目的驱动的移动性专家。分析活动目的并基于以下内容选择最可能的位置:
            - 活动目的模式
            - 序列活动逻辑
            - 上下文适合的位置
            从选项中选择(选择)并提供置信度得分。""",
            "temperature": 0.2,
        },
        {
            "name": "routing_expert",
            "prompt": """你是一个路线和转换专家。分析移动模式并基于以下内容选择最可能的位置:
            - 常见转换路线
            - 移动效率
            - 序列位置逻辑
            从选项中选择(选择)并提供置信度得分。""",
            "temperature": 0.15,
        }
    ],
    aggregator="confidence_weighted_voting",
    consensus_threshold=0.7,  # 最终决策需要高共识
)

子智能体专业化

时间子智能体

专注于基于时间的模式和偏好:

{
    "name": "hourly_expert",
    "prompt": """分析每小时移动模式:
    - 峰值活动时间(7-9am, 12-2pm, 5-7pm)
    - 非峰值行为模式
    - 午餐时间(11am-2pm)  
    - 晚间例行公事(6-10pm)
    
    考虑当前时间上下文并选择最适合时间的位置。"""
}
{
    "name": "weekly_expert", 
    "prompt": """分析周移动模式:
    - 工作日 vs 周末行为
    - 周一早上模式
    - 周五晚上模式
    - 周末休闲活动
    
    考虑星期几上下文并相应选择。"""
}
{
    "name": "seasonal_expert",
    "prompt": """分析季节和日历模式:
    - 天气对移动性的影响
    - 节假日和特殊事件模式
    - 季节活动偏好
    - 学术日历影响
    
    在你的选择中考虑季节上下文。"""
}

空间子智能体

专注于地理和基于位置的分析:

spatial_agents = [
    {
        "name": "clustering_expert",
        "prompt": """分析空间聚类模式:
        - 识别活动热点和集群
        - 考虑空间密度和可达性
        - 评估距离关系
        - 评估地理约束"""
    },
    {
        "name": "poi_expert", 
        "prompt": """分析兴趣点模式:
        - 考虑POI类别和目的
        - 评估营业时间和可用性
        - 评估POI受欢迎度和容量
        - 考虑服务可达性"""
    },
    {
        "name": "network_expert",
        "prompt": """分析交通网络因素:
        - 考虑路线可达性
        - 评估旅行时间和距离
        - 评估交通方式偏好  
        - 考虑网络连通性"""
    }
]

目的驱动的子智能体

专注于活动目的和上下文逻辑:

purpose_agents = [
    {
        "name": "activity_sequence_expert",
        "prompt": """分析活动序列逻辑:
        - 常见活动转换(工作→用餐→工作→家)
        - 目的驱动的位置选择
        - 活动持续时间期望
        - 序列一致性"""
    },
    {
        "name": "lifestyle_expert",
        "prompt": """分析生活方式和行为模式:
        - 个人偏好和习惯
        - 常规 vs 自发行为
        - 社交和文化因素
        - 个体移动性风格"""
    }
]

聚合策略

置信度加权投票

默认的聚合策略,按置信度加权每个子智能体的投票:

def confidence_weighted_voting(subagent_responses):
    """使用置信度加权投票聚合响应"""
    
    location_scores = {}
    total_confidence = 0
    
    for response in subagent_responses:
        selection = response.get('selection')
        confidence = response.get('confidence', 0.5)
        
        if selection:
            location_scores[selection] = location_scores.get(selection, 0) + confidence
            total_confidence += confidence
    
    if not location_scores:
        return None
    
    # 标准化得分
    for location in location_scores:
        location_scores[location] /= total_confidence
    
    # 选择最高得分位置
    best_location = max(location_scores.items(), key=lambda x: x[1])
    
    return {
        'selection': best_location[0],
        'confidence': best_location[1],
        'scores': location_scores,
        'rationale': f"由{len(location_scores)}个智能体选择,平均置信度{best_location[1]:.3f}"
    }

自定义聚合策略

class ConsensusAggregator:
    def __init__(self, consensus_threshold=0.6):
        self.consensus_threshold = consensus_threshold
    
    def aggregate(self, subagent_responses):
        """需要高于阈值的共识"""
        
        selections = [r.get('selection') for r in subagent_responses if r.get('selection')]
        
        if not selections:
            return None
        
        # 计数选择
        selection_counts = {}
        for selection in selections:
            selection_counts[selection] = selection_counts.get(selection, 0) + 1
        
        # 检查共识
        total_agents = len(subagent_responses)
        for selection, count in selection_counts.items():
            if count / total_agents >= self.consensus_threshold:
                avg_confidence = sum(
                    r['confidence'] for r in subagent_responses 
                    if r.get('selection') == selection
                ) / count
                
                return {
                    'selection': selection,
                    'confidence': avg_confidence,
                    'consensus_ratio': count / total_agents,
                    'rationale': f"由{count}/{total_agents}个智能体达成共识"
                }
        
        # 未达成共识
        return {
            'selection': max(selection_counts.items(), key=lambda x: x[1])[0],
            'confidence': 0.3,  # 缺乏共识的低置信度
            'consensus_ratio': max(selection_counts.values()) / total_agents,
            'rationale': "没有强共识,选择最受欢迎的选项"
        }

通过MCP的工具集成

子智能体可以通过MCP(模型上下文协议)框架访问外部工具:

天气工具集成

reflection_cfg_with_weather = dict(
    max_round=1,
    subAgents=[
        {
            "name": "weather_aware_agent",
            "prompt": """在位置选择中考虑天气条件:
            - 室内 vs 室外活动偏好
            - 天气适宜的位置
            - 季节活动调整
            从选项中选择并解释天气影响。""",
            "mcp": {
                "weather": {}  # 启用天气工具
            }
        },
        {
            "name": "baseline_agent",
            "prompt": "基于历史模式选择,不考虑外部上下文。"
        }
    ],
    aggregator="confidence_weighted_voting"
)

地图和POI集成

reflection_cfg_with_maps = dict(
    max_round=1,
    subAgents=[
        {
            "name": "poi_aware_agent", 
            "prompt": """使用附近POI信息来告知位置选择:
            - 营业时间和可用性
            - POI类别和服务
            - 热门目的地
            使用POI上下文从选项中选择。""",
            "mcp": {
                "maps": {},  # 启用POI查找工具
                "gmap": {    # 启用远程地图服务
                    "url": "https://your-mcp-server/sse",
                    "transport": "sse"
                }
            }
        }
    ],
    aggregator="confidence_weighted_voting"
)

高级反思模式

多轮反思

为复杂决策启用多轮反思:

multi_round_cfg = dict(
    max_round=3,
    subAgents=[
        {
            "name": "initial_analyzer",
            "prompt": "提供初始分析和广泛的位置候选。"
        },
        {
            "name": "detail_evaluator", 
            "prompt": "详细评估初始建议并精化选择。"
        },
        {
            "name": "final_validator",
            "prompt": "根据所有可用证据验证最终选择。"
        }
    ],
    aggregator="consensus_with_validation",
    round_feedback=True  # 允许智能体看到前一轮结果
)

性能和优化

并行子智能体执行

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def execute_subagents_parallel(subagents, query, llm):
    """并行执行子智能体以获得更好的性能"""
    
    async def execute_single_agent(agent_config):
        # 执行单个子智能体
        result = await llm.ainvoke(agent_config['prompt'] + f"\n查询: {query}")
        return {
            'agent': agent_config['name'],
            'result': result,
            'timestamp': datetime.now().isoformat()
        }
    
    # 并发执行所有子智能体  
    tasks = [execute_single_agent(agent) for agent in subagents]
    results = await asyncio.gather(*tasks)
    
    return results

性能考虑:

  • 并行执行可以显著加速多智能体系统
  • 为重复的相似查询缓存子智能体结果
  • 考虑子智能体复杂性 vs 准确性权衡
  • 监控多智能体LLM API成本

调试和监控

子智能体响应分析

def analyze_subagent_responses(responses):
    """分析子智能体响应模式"""
    
    analysis = {
        'response_count': len(responses),
        'confidence_stats': {},
        'selection_distribution': {},
        'reasoning_themes': []
    }
    
    # 置信度统计
    confidences = [r.get('confidence', 0) for r in responses]
    analysis['confidence_stats'] = {
        'mean': sum(confidences) / len(confidences) if confidences else 0,
        'min': min(confidences) if confidences else 0,
        'max': max(confidences) if confidences else 0,
        'std': np.std(confidences) if confidences else 0
    }
    
    # 选择分布
    selections = [r.get('selection') for r in responses if r.get('selection')]
    for selection in selections:
        analysis['selection_distribution'][selection] = analysis['selection_distribution'].get(selection, 0) + 1
    
    # 推理分析(简化)
    rationales = [r.get('rationale', '') for r in responses]
    common_terms = {}
    for rationale in rationales:
        words = rationale.lower().split()
        for word in words:
            if len(word) > 3:  # 过滤短词
                common_terms[word] = common_terms.get(word, 0) + 1
    
    analysis['reasoning_themes'] = sorted(common_terms.items(), key=lambda x: x[1], reverse=True)[:10]
    
    return analysis

下一步