MRRA LogoMRRA
资源

故障排除

MRRA的常见问题、解决方案和最佳实践

故障排除

本指南涵盖常见问题、解决方案和有效使用MRRA的最佳实践。

常见问题

活动目的问题

问题: 所有活动目的都被分配为"Other"或"其他"

常见原因:

  • 传递LLM配置字典而不是LLM客户端对象
  • LLM API认证问题
  • 图构建时再次调用LLM而不是使用缓存的目的

解决方案:

# ❌ 错误:传递配置字典
acts = ActivityPurposeAssigner(tb, llm=llm_cfg, concurrency=8).assign(acts)

# ✅ 正确:传递LLM客户端对象
from mrra.agents.subagents import make_llm
llm = make_llm(**llm_cfg)  # 创建客户端对象
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)

# ✅ 确保图构建使用缓存的目的
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)

验证:

# 检查目的是否正确分配
purpose_counts = {}
for act in acts:
    purpose = getattr(act, 'purpose', '无目的')
    purpose_counts[purpose] = purpose_counts.get(purpose, 0) + 1

print("目的分布:", purpose_counts)

# 应该看到多种目的,不只是"Other"

问题: LLM API连接失败或超时

常见原因:

  • API密钥或base URL无效
  • 网络连接问题
  • LLM提供商的速率限制
  • 模型名称不正确

解决方案:

# 首先测试LLM连接
def test_llm_connection(llm_cfg):
    try:
        llm = make_llm(**llm_cfg)
        test_response = llm.invoke("测试消息")
        print("✅ LLM连接成功")
        return True
    except Exception as e:
        print(f"❌ LLM连接失败: {e}")
        return False

# 在目的分配中使用之前先测试
if test_llm_connection(llm_cfg):
    llm = make_llm(**llm_cfg)
    acts = ActivityPurposeAssigner(
        tb, 
        llm=llm, 
        concurrency=4,  # 如果被限速则减少并发
        llm_timeout=120  # 增加超时时间
    ).assign(acts)

速率限制处理:

# 使用重试逻辑处理速率限制
import time
import random

def assign_purposes_with_retry(tb, llm, acts, max_retries=3):
    for attempt in range(max_retries):
        try:
            return ActivityPurposeAssigner(
                tb, 
                llm=llm, 
                concurrency=2,  # 降低并发
                llm_timeout=60
            ).assign(acts)
        except Exception as e:
            if "rate limit" in str(e).lower() and attempt < max_retries - 1:
                wait_time = (2 ** attempt) + random.random()
                print(f"速率受限,等待{wait_time:.1f}秒后重试{attempt + 1}")
                time.sleep(wait_time)
            else:
                raise e

问题: 分配的目的模糊或不正确

常见原因:

  • 位置上下文不足
  • 轨迹数据质量差
  • 通用提示未针对领域定制

解决方案:

# 通过更好的上下文增强目的分配
class EnhancedActivityPurposeAssigner(ActivityPurposeAssigner):
    def __init__(self, *args, custom_prompt=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.custom_prompt = custom_prompt
    
    def create_purpose_prompt(self, activity):
        """创建带更多上下文的增强提示"""
        
        base_prompt = super().create_purpose_prompt(activity)
        
        if self.custom_prompt:
            enhanced_prompt = f"""
            {self.custom_prompt}
            
            {base_prompt}
            
            额外上下文:
            - 持续时间:{activity.duration_minutes}分钟
            - 时间:{activity.start}{activity.end}
            - 星期几:{activity.start.strftime('%A')}
            - 小时:{activity.start.hour}
            
            常见目的包括:工作、家庭、用餐、购物、休闲、
            医疗保健、教育、交通、社交、商务。
            """
        else:
            enhanced_prompt = base_prompt
        
        return enhanced_prompt

# 使用增强分配器
custom_prompt = """
您正在分析城市专业人士的移动数据。
在分配目的时考虑典型的日常例行公事和本地上下文。
"""

enhanced_assigner = EnhancedActivityPurposeAssigner(
    tb, 
    llm=llm, 
    custom_prompt=custom_prompt,
    concurrency=4
)
acts = enhanced_assigner.assign(acts)

图和检索问题

图保存错误

问题: networkx has no attribute write_gpickle错误

解决方案: MRRA使用标准pickle进行图序列化:

# ❌ 不要直接使用networkx write_gpickle
# nx.write_gpickle(G, "graph.gpickle")

# ✅ 使用CacheManager进行图操作
from mrra.persist.cache import CacheManager

cm = CacheManager()
cm.save_graph(tb_hash, "mobility_default", mg.G)
cached_graph = cm.load_graph(tb_hash, "mobility_default")

基于目的的检索不工作

问题: 目的上下文不影响检索结果

解决方案: 确保图中存在目的节点并配置检索权重:

# 检查图中是否存在目的节点
purpose_nodes = [n for n in mg.G.nodes() if n.startswith('p_')]
print(f"图中的目的节点: {purpose_nodes}")

# 为目的配置检索权重
from mrra.retriever.graph_rag import GraphRAGGenerate

retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
retriever.purpose_weight = 0.6   # 增加目的影响
retriever.hour_weight = 0.4
retriever.dow_weight = 0.3

# 在查询中使用目的
docs = retriever.get_relevant_documents({
    "user_id": user_id,
    "purpose": "dining",  # 或者 ["dining", "work"]
    "k": 8
})

# 验证目的影响
for doc in docs:
    print(f"位置: {doc.metadata['node']}, 得分: {doc.metadata['score']:.4f}")

性能和内存问题

活动目的分配缓慢

问题: 目的分配时间过长或使用内存过多

解决方案: 优化并发并实施批处理:

# 基于系统优化并发
import os
import psutil

def get_optimal_concurrency():
    """根据系统资源计算最优并发数"""
    cpu_count = os.cpu_count()
    memory_gb = psutil.virtual_memory().total / (1024**3)
    
    # 保守方法:基于内存和CPU限制
    max_concurrent = min(
        cpu_count * 2,  # 基于CPU的限制
        int(memory_gb),  # 基于内存的限制(每个工作进程1GB)
        10  # 硬限制以防止API过载
    )
    
    return max(1, max_concurrent)

# 使用最优并发
optimal_concurrency = get_optimal_concurrency()
print(f"使用并发数: {optimal_concurrency}")

acts = ActivityPurposeAssigner(
    tb, 
    llm=llm, 
    concurrency=optimal_concurrency,
    llm_timeout=60  # 合理的超时时间
).assign(acts)

大数据集的内存问题

问题: 大轨迹数据集出现内存不足错误

解决方案: 实施批处理和内存管理:

def process_large_dataset_in_batches(tb, batch_size=1000):
    """批量处理大轨迹数据集"""
    
    total_points = len(tb.df)
    results = []
    
    for i in range(0, total_points, batch_size):
        batch_df = tb.df.iloc[i:i+batch_size].copy()
        batch_tb = TrajectoryBatch(batch_df)
        
        print(f"处理第{i//batch_size + 1}批: {len(batch_df)}个点")
        
        # 处理批次
        batch_acts = ActivityExtractor(batch_tb).extract()
        batch_acts = ActivityPurposeAssigner(batch_tb, llm=llm, concurrency=2).assign(batch_acts)
        
        results.extend(batch_acts)
        
        # 强制垃圾回收
        import gc
        gc.collect()
    
    return results

# 用于大数据集
if len(tb.df) > 10000:
    print("检测到大数据集,使用批处理")
    acts = process_large_dataset_in_batches(tb, batch_size=2000)
else:
    acts = ActivityExtractor(tb).extract()
    acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)

数据质量问题

活动检测不足

问题: 从轨迹数据中检测到的活动太少

解决方案: 调整提取参数:

# 首先分析数据特征
def analyze_trajectory_characteristics(tb):
    """分析轨迹数据以建议参数"""
    
    df = tb.df
    
    # 计算基本统计
    stats = {
        'total_points': len(df),
        'unique_users': df['user_id'].nunique(),
        'time_span_days': (df['timestamp_local'].max() - df['timestamp_local'].min()).days,
        'avg_points_per_user': len(df) / df['user_id'].nunique(),
    }
    
    # 计算连续点之间的典型距离
    distances = []
    for user_id in df['user_id'].unique():
        user_df = df[df['user_id'] == user_id].sort_values('timestamp_local')
        
        for i in range(1, len(user_df)):
            prev_row = user_df.iloc[i-1]
            curr_row = user_df.iloc[i]
            
            # 简单距离计算(近似)
            lat_diff = curr_row['latitude'] - prev_row['latitude']
            lon_diff = curr_row['longitude'] - prev_row['longitude']
            distance = ((lat_diff**2 + lon_diff**2)**0.5) * 111000  # 粗略米数
            
            distances.append(distance)
            
            if len(distances) > 1000:  # 采样以避免内存问题
                break
    
    if distances:
        stats['median_distance_m'] = sorted(distances)[len(distances)//2]
        stats['90th_percentile_distance_m'] = sorted(distances)[int(len(distances)*0.9)]
    
    return stats

# 分析并调整参数
stats = analyze_trajectory_characteristics(tb)
print("轨迹特征:", stats)

# 基于数据建议参数
suggested_radius = min(500, max(100, stats.get('90th_percentile_distance_m', 300)))
suggested_min_dwell = 20 if stats.get('avg_points_per_user', 0) > 1000 else 10

ext_cfg = dict(
    method="radius",
    radius_m=suggested_radius,
    min_dwell_minutes=suggested_min_dwell,
    max_gap_minutes=120  # 对稀疏数据增加
)

print(f"建议的提取配置: {ext_cfg}")
acts = ActivityExtractor(tb, **ext_cfg).extract()
print(f"提取了{len(acts)}个活动")

Geolife数据集问题

数据加载问题

问题: 加载Geolife .plt文件的问题

解决方案: 验证数据结构并使用正确的加载:

def verify_geolife_structure(data_dir):
    """验证Geolife数据结构"""
    import os
    import glob
    
    expected_structure = {}
    
    user_dirs = glob.glob(os.path.join(data_dir, "*"))
    for user_dir in user_dirs:
        if not os.path.isdir(user_dir):
            continue
            
        user_name = os.path.basename(user_dir)
        trajectory_dir = os.path.join(user_dir, "Trajectory")
        
        if os.path.exists(trajectory_dir):
            plt_files = glob.glob(os.path.join(trajectory_dir, "*.plt"))
            expected_structure[user_name] = len(plt_files)
        else:
            expected_structure[user_name] = 0
    
    print("Geolife数据结构:")
    for user, file_count in expected_structure.items():
        print(f"  {user}: {file_count}个.plt文件")
    
    return expected_structure

# 运行脚本前验证
verify_geolife_structure("scripts/Data")

# 指定特定用户运行
import os
os.environ['GEOLIFE_USER'] = 'user_001'  # 指定用户

脚本执行问题

问题: scripts/verify_geolife.py运行失败

解决方案: 检查环境和依赖:

# 确保在正确目录中
cd /path/to/mrra/project

# 检查脚本是否存在
ls scripts/verify_geolife.py

# 使用Python路径运行
python scripts/verify_geolife.py

# 如果有导入错误,检查安装
pip install -e .

# 检查必需的依赖
pip list | grep -E "(pandas|numpy|networkx|langchain)"

最佳实践

开发工作流程

def robust_mrra_pipeline(df, llm_cfg, debug=True):
    """带错误处理和验证的稳健MRRA管道"""
    
    try:
        # 1. 验证输入数据
        if debug:
            print("🔍 验证输入数据...")
        
        required_cols = ['user_id', 'timestamp', 'latitude', 'longitude']
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            raise ValueError(f"缺少必需列: {missing_cols}")
        
        if len(df) == 0:
            raise ValueError("轨迹数据为空")
        
        # 2. 使用缓存初始化
        tb = TrajectoryBatch(df)
        cm = CacheManager()
        tb_hash = compute_tb_hash(tb)
        
        if debug:
            print(f"📊 处理{len(df)}个轨迹点,{tb.df['user_id'].nunique()}个用户")
            print(f"🔑 缓存键: {tb_hash}")
        
        # 3. 首先尝试缓存的活动
        activities_key = "robust_pipeline_activities"
        acts = cm.load_activities(tb_hash, activities_key)
        
        if acts:
            if debug:
                print(f"✅ 加载了{len(acts)}个缓存活动")
        else:
            if debug:
                print("📊 计算活动...")
            
            # 测试LLM连接
            if not test_llm_connection(llm_cfg):
                raise RuntimeError("LLM连接失败")
            
            # 提取活动
            ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
            acts = ActivityExtractor(tb, **ext_cfg).extract()
            
            if len(acts) == 0:
                print("⚠️ 未检测到活动,调整参数...")
                ext_cfg['min_dwell_minutes'] = 15
                ext_cfg['radius_m'] = 500
                acts = ActivityExtractor(tb, **ext_cfg).extract()
            
            # 分配目的
            llm = make_llm(**llm_cfg)
            acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=4).assign(acts)
            
            # 缓存结果
            cm.save_activities(tb_hash, activities_key, acts)
            
            if debug:
                print(f"💾 缓存了{len(acts)}个活动")
        
        # 4. 构建图
        graph_key = "robust_pipeline_graph"
        cached_graph = cm.load_graph(tb_hash, graph_key)
        
        if cached_graph:
            if debug:
                print("✅ 加载了缓存的移动图")
            cfg = GraphConfig(grid_size_m=200, use_activities=True)
            mg = MobilityGraph(tb, cfg)
            mg.G = cached_graph
        else:
            if debug:
                print("📊 构建移动图...")
            cfg = GraphConfig(grid_size_m=200, use_activities=True)
            mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
            cm.save_graph(tb_hash, graph_key, mg.G)
        
        # 5. 创建智能体
        retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
        
        reflection_cfg = dict(
            max_round=1,
            subAgents=[
                {"name": "temporal", "prompt": "从选项中选择最可能的位置id。"},
                {"name": "spatial", "prompt": "从选项中选择最可能的位置id。"},
            ],
            aggregator="confidence_weighted_voting"
        )
        
        agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
        
        if debug:
            print("✅ MRRA管道成功完成")
        
        return agent, {
            'trajectory_batch': tb,
            'activities': acts,
            'mobility_graph': mg,
            'cache_hash': tb_hash
        }
        
    except Exception as e:
        print(f"❌ 管道失败: {e}")
        import traceback
        if debug:
            traceback.print_exc()
        raise e

# 带错误处理的使用
try:
    agent, components = robust_mrra_pipeline(df, llm_cfg, debug=True)
    
    # 测试预测
    user_id = components['trajectory_batch'].users()[0]
    result = agent.invoke({
        "task": "next_position",
        "user_id": user_id
    })
    print("✅ 预测成功:", result)
    
except Exception as e:
    print(f"❌ MRRA管道错误: {e}")

常见陷阱:

  • 始终使用make_llm(**llm_cfg)创建LLM客户端对象
  • 在图构建中使用缓存活动时设置assume_purposes_assigned=True
  • 缓存带目的的活动以避免昂贵的LLM重新计算
  • 监控大数据集的内存使用
  • 在运行昂贵操作之前测试LLM连接性

调试工具

调试模式配置

import logging

# 启用调试日志
logging.basicConfig(level=logging.DEBUG)

# MRRA特定的调试标志
debug_config = {
    'verbose_activity_extraction': True,
    'log_llm_calls': True,
    'save_intermediate_results': True,
    'validate_graph_construction': True
}

# 在管道中使用
def debug_mrra_pipeline(df, llm_cfg, debug_config):
    """带全面调试的管道"""
    
    if debug_config.get('verbose_activity_extraction'):
        print("🔍 活动提取详情:")
        tb = TrajectoryBatch(df)
        
        for user_id in tb.users()[:1]:  # 调试第一个用户
            user_data = tb.for_user(user_id)
            print(f"  用户{user_id}: {len(user_data)}个点")
            print(f"  时间范围: {user_data['timestamp_local'].min()}{user_data['timestamp_local'].max()}")
    
    # 继续常规管道...

性能分析

import time
import psutil
from functools import wraps

def profile_performance(func):
    """分析函数性能的装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        process = psutil.Process()
        
        start_time = time.time()
        start_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        result = func(*args, **kwargs)
        
        end_time = time.time()
        end_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        print(f"⏱️ {func.__name__}: {end_time - start_time:.2f}秒, 内存: {start_memory:.1f}MB → {end_memory:.1f}MB")
        
        return result
    return wrapper

# 使用性能分析
@profile_performance
def profile_activity_extraction(tb):
    return ActivityExtractor(tb).extract()

@profile_performance  
def profile_purpose_assignment(tb, llm, acts):
    return ActivityPurposeAssigner(tb, llm=llm).assign(acts)

# 分析你的管道
acts = profile_activity_extraction(tb)
acts = profile_purpose_assignment(tb, llm, acts)

获取帮助:

  • 查看现有的GitHub问题寻找类似问题
  • 报告错误时创建最小可重现示例
  • 包含错误消息、堆栈跟踪和环境信息
  • 首先使用较小的数据集测试以隔离问题

下一步

  • 查看配置了解高级设置选项
  • 探索开发以贡献修复和改进
  • 查看示例了解工作代码示例