资源
故障排除
MRRA的常见问题、解决方案和最佳实践
故障排除
本指南涵盖常见问题、解决方案和有效使用MRRA的最佳实践。
常见问题
活动目的问题
问题: 所有活动目的都被分配为"Other"或"其他"
常见原因:
- 传递LLM配置字典而不是LLM客户端对象
- LLM API认证问题
- 图构建时再次调用LLM而不是使用缓存的目的
解决方案:
# ❌ 错误:传递配置字典
acts = ActivityPurposeAssigner(tb, llm=llm_cfg, concurrency=8).assign(acts)
# ✅ 正确:传递LLM客户端对象
from mrra.agents.subagents import make_llm
llm = make_llm(**llm_cfg) # 创建客户端对象
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
# ✅ 确保图构建使用缓存的目的
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)验证:
# 检查目的是否正确分配
purpose_counts = {}
for act in acts:
purpose = getattr(act, 'purpose', '无目的')
purpose_counts[purpose] = purpose_counts.get(purpose, 0) + 1
print("目的分布:", purpose_counts)
# 应该看到多种目的,不只是"Other"问题: LLM API连接失败或超时
常见原因:
- API密钥或base URL无效
- 网络连接问题
- LLM提供商的速率限制
- 模型名称不正确
解决方案:
# 首先测试LLM连接
def test_llm_connection(llm_cfg):
try:
llm = make_llm(**llm_cfg)
test_response = llm.invoke("测试消息")
print("✅ LLM连接成功")
return True
except Exception as e:
print(f"❌ LLM连接失败: {e}")
return False
# 在目的分配中使用之前先测试
if test_llm_connection(llm_cfg):
llm = make_llm(**llm_cfg)
acts = ActivityPurposeAssigner(
tb,
llm=llm,
concurrency=4, # 如果被限速则减少并发
llm_timeout=120 # 增加超时时间
).assign(acts)速率限制处理:
# 使用重试逻辑处理速率限制
import time
import random
def assign_purposes_with_retry(tb, llm, acts, max_retries=3):
for attempt in range(max_retries):
try:
return ActivityPurposeAssigner(
tb,
llm=llm,
concurrency=2, # 降低并发
llm_timeout=60
).assign(acts)
except Exception as e:
if "rate limit" in str(e).lower() and attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.random()
print(f"速率受限,等待{wait_time:.1f}秒后重试{attempt + 1}")
time.sleep(wait_time)
else:
raise e问题: 分配的目的模糊或不正确
常见原因:
- 位置上下文不足
- 轨迹数据质量差
- 通用提示未针对领域定制
解决方案:
# 通过更好的上下文增强目的分配
class EnhancedActivityPurposeAssigner(ActivityPurposeAssigner):
def __init__(self, *args, custom_prompt=None, **kwargs):
super().__init__(*args, **kwargs)
self.custom_prompt = custom_prompt
def create_purpose_prompt(self, activity):
"""创建带更多上下文的增强提示"""
base_prompt = super().create_purpose_prompt(activity)
if self.custom_prompt:
enhanced_prompt = f"""
{self.custom_prompt}
{base_prompt}
额外上下文:
- 持续时间:{activity.duration_minutes}分钟
- 时间:{activity.start}至{activity.end}
- 星期几:{activity.start.strftime('%A')}
- 小时:{activity.start.hour}
常见目的包括:工作、家庭、用餐、购物、休闲、
医疗保健、教育、交通、社交、商务。
"""
else:
enhanced_prompt = base_prompt
return enhanced_prompt
# 使用增强分配器
custom_prompt = """
您正在分析城市专业人士的移动数据。
在分配目的时考虑典型的日常例行公事和本地上下文。
"""
enhanced_assigner = EnhancedActivityPurposeAssigner(
tb,
llm=llm,
custom_prompt=custom_prompt,
concurrency=4
)
acts = enhanced_assigner.assign(acts)图和检索问题
图保存错误
问题: networkx has no attribute write_gpickle错误
解决方案: MRRA使用标准pickle进行图序列化:
# ❌ 不要直接使用networkx write_gpickle
# nx.write_gpickle(G, "graph.gpickle")
# ✅ 使用CacheManager进行图操作
from mrra.persist.cache import CacheManager
cm = CacheManager()
cm.save_graph(tb_hash, "mobility_default", mg.G)
cached_graph = cm.load_graph(tb_hash, "mobility_default")基于目的的检索不工作
问题: 目的上下文不影响检索结果
解决方案: 确保图中存在目的节点并配置检索权重:
# 检查图中是否存在目的节点
purpose_nodes = [n for n in mg.G.nodes() if n.startswith('p_')]
print(f"图中的目的节点: {purpose_nodes}")
# 为目的配置检索权重
from mrra.retriever.graph_rag import GraphRAGGenerate
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
retriever.purpose_weight = 0.6 # 增加目的影响
retriever.hour_weight = 0.4
retriever.dow_weight = 0.3
# 在查询中使用目的
docs = retriever.get_relevant_documents({
"user_id": user_id,
"purpose": "dining", # 或者 ["dining", "work"]
"k": 8
})
# 验证目的影响
for doc in docs:
print(f"位置: {doc.metadata['node']}, 得分: {doc.metadata['score']:.4f}")性能和内存问题
活动目的分配缓慢
问题: 目的分配时间过长或使用内存过多
解决方案: 优化并发并实施批处理:
# 基于系统优化并发
import os
import psutil
def get_optimal_concurrency():
"""根据系统资源计算最优并发数"""
cpu_count = os.cpu_count()
memory_gb = psutil.virtual_memory().total / (1024**3)
# 保守方法:基于内存和CPU限制
max_concurrent = min(
cpu_count * 2, # 基于CPU的限制
int(memory_gb), # 基于内存的限制(每个工作进程1GB)
10 # 硬限制以防止API过载
)
return max(1, max_concurrent)
# 使用最优并发
optimal_concurrency = get_optimal_concurrency()
print(f"使用并发数: {optimal_concurrency}")
acts = ActivityPurposeAssigner(
tb,
llm=llm,
concurrency=optimal_concurrency,
llm_timeout=60 # 合理的超时时间
).assign(acts)大数据集的内存问题
问题: 大轨迹数据集出现内存不足错误
解决方案: 实施批处理和内存管理:
def process_large_dataset_in_batches(tb, batch_size=1000):
"""批量处理大轨迹数据集"""
total_points = len(tb.df)
results = []
for i in range(0, total_points, batch_size):
batch_df = tb.df.iloc[i:i+batch_size].copy()
batch_tb = TrajectoryBatch(batch_df)
print(f"处理第{i//batch_size + 1}批: {len(batch_df)}个点")
# 处理批次
batch_acts = ActivityExtractor(batch_tb).extract()
batch_acts = ActivityPurposeAssigner(batch_tb, llm=llm, concurrency=2).assign(batch_acts)
results.extend(batch_acts)
# 强制垃圾回收
import gc
gc.collect()
return results
# 用于大数据集
if len(tb.df) > 10000:
print("检测到大数据集,使用批处理")
acts = process_large_dataset_in_batches(tb, batch_size=2000)
else:
acts = ActivityExtractor(tb).extract()
acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)数据质量问题
活动检测不足
问题: 从轨迹数据中检测到的活动太少
解决方案: 调整提取参数:
# 首先分析数据特征
def analyze_trajectory_characteristics(tb):
"""分析轨迹数据以建议参数"""
df = tb.df
# 计算基本统计
stats = {
'total_points': len(df),
'unique_users': df['user_id'].nunique(),
'time_span_days': (df['timestamp_local'].max() - df['timestamp_local'].min()).days,
'avg_points_per_user': len(df) / df['user_id'].nunique(),
}
# 计算连续点之间的典型距离
distances = []
for user_id in df['user_id'].unique():
user_df = df[df['user_id'] == user_id].sort_values('timestamp_local')
for i in range(1, len(user_df)):
prev_row = user_df.iloc[i-1]
curr_row = user_df.iloc[i]
# 简单距离计算(近似)
lat_diff = curr_row['latitude'] - prev_row['latitude']
lon_diff = curr_row['longitude'] - prev_row['longitude']
distance = ((lat_diff**2 + lon_diff**2)**0.5) * 111000 # 粗略米数
distances.append(distance)
if len(distances) > 1000: # 采样以避免内存问题
break
if distances:
stats['median_distance_m'] = sorted(distances)[len(distances)//2]
stats['90th_percentile_distance_m'] = sorted(distances)[int(len(distances)*0.9)]
return stats
# 分析并调整参数
stats = analyze_trajectory_characteristics(tb)
print("轨迹特征:", stats)
# 基于数据建议参数
suggested_radius = min(500, max(100, stats.get('90th_percentile_distance_m', 300)))
suggested_min_dwell = 20 if stats.get('avg_points_per_user', 0) > 1000 else 10
ext_cfg = dict(
method="radius",
radius_m=suggested_radius,
min_dwell_minutes=suggested_min_dwell,
max_gap_minutes=120 # 对稀疏数据增加
)
print(f"建议的提取配置: {ext_cfg}")
acts = ActivityExtractor(tb, **ext_cfg).extract()
print(f"提取了{len(acts)}个活动")Geolife数据集问题
数据加载问题
问题: 加载Geolife .plt文件的问题
解决方案: 验证数据结构并使用正确的加载:
def verify_geolife_structure(data_dir):
"""验证Geolife数据结构"""
import os
import glob
expected_structure = {}
user_dirs = glob.glob(os.path.join(data_dir, "*"))
for user_dir in user_dirs:
if not os.path.isdir(user_dir):
continue
user_name = os.path.basename(user_dir)
trajectory_dir = os.path.join(user_dir, "Trajectory")
if os.path.exists(trajectory_dir):
plt_files = glob.glob(os.path.join(trajectory_dir, "*.plt"))
expected_structure[user_name] = len(plt_files)
else:
expected_structure[user_name] = 0
print("Geolife数据结构:")
for user, file_count in expected_structure.items():
print(f" {user}: {file_count}个.plt文件")
return expected_structure
# 运行脚本前验证
verify_geolife_structure("scripts/Data")
# 指定特定用户运行
import os
os.environ['GEOLIFE_USER'] = 'user_001' # 指定用户脚本执行问题
问题: scripts/verify_geolife.py运行失败
解决方案: 检查环境和依赖:
# 确保在正确目录中
cd /path/to/mrra/project
# 检查脚本是否存在
ls scripts/verify_geolife.py
# 使用Python路径运行
python scripts/verify_geolife.py
# 如果有导入错误,检查安装
pip install -e .
# 检查必需的依赖
pip list | grep -E "(pandas|numpy|networkx|langchain)"最佳实践
开发工作流程
def robust_mrra_pipeline(df, llm_cfg, debug=True):
"""带错误处理和验证的稳健MRRA管道"""
try:
# 1. 验证输入数据
if debug:
print("🔍 验证输入数据...")
required_cols = ['user_id', 'timestamp', 'latitude', 'longitude']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"缺少必需列: {missing_cols}")
if len(df) == 0:
raise ValueError("轨迹数据为空")
# 2. 使用缓存初始化
tb = TrajectoryBatch(df)
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
if debug:
print(f"📊 处理{len(df)}个轨迹点,{tb.df['user_id'].nunique()}个用户")
print(f"🔑 缓存键: {tb_hash}")
# 3. 首先尝试缓存的活动
activities_key = "robust_pipeline_activities"
acts = cm.load_activities(tb_hash, activities_key)
if acts:
if debug:
print(f"✅ 加载了{len(acts)}个缓存活动")
else:
if debug:
print("📊 计算活动...")
# 测试LLM连接
if not test_llm_connection(llm_cfg):
raise RuntimeError("LLM连接失败")
# 提取活动
ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
acts = ActivityExtractor(tb, **ext_cfg).extract()
if len(acts) == 0:
print("⚠️ 未检测到活动,调整参数...")
ext_cfg['min_dwell_minutes'] = 15
ext_cfg['radius_m'] = 500
acts = ActivityExtractor(tb, **ext_cfg).extract()
# 分配目的
llm = make_llm(**llm_cfg)
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=4).assign(acts)
# 缓存结果
cm.save_activities(tb_hash, activities_key, acts)
if debug:
print(f"💾 缓存了{len(acts)}个活动")
# 4. 构建图
graph_key = "robust_pipeline_graph"
cached_graph = cm.load_graph(tb_hash, graph_key)
if cached_graph:
if debug:
print("✅ 加载了缓存的移动图")
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg)
mg.G = cached_graph
else:
if debug:
print("📊 构建移动图...")
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
cm.save_graph(tb_hash, graph_key, mg.G)
# 5. 创建智能体
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
reflection_cfg = dict(
max_round=1,
subAgents=[
{"name": "temporal", "prompt": "从选项中选择最可能的位置id。"},
{"name": "spatial", "prompt": "从选项中选择最可能的位置id。"},
],
aggregator="confidence_weighted_voting"
)
agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
if debug:
print("✅ MRRA管道成功完成")
return agent, {
'trajectory_batch': tb,
'activities': acts,
'mobility_graph': mg,
'cache_hash': tb_hash
}
except Exception as e:
print(f"❌ 管道失败: {e}")
import traceback
if debug:
traceback.print_exc()
raise e
# 带错误处理的使用
try:
agent, components = robust_mrra_pipeline(df, llm_cfg, debug=True)
# 测试预测
user_id = components['trajectory_batch'].users()[0]
result = agent.invoke({
"task": "next_position",
"user_id": user_id
})
print("✅ 预测成功:", result)
except Exception as e:
print(f"❌ MRRA管道错误: {e}")常见陷阱:
- 始终使用
make_llm(**llm_cfg)创建LLM客户端对象 - 在图构建中使用缓存活动时设置
assume_purposes_assigned=True - 缓存带目的的活动以避免昂贵的LLM重新计算
- 监控大数据集的内存使用
- 在运行昂贵操作之前测试LLM连接性
调试工具
调试模式配置
import logging
# 启用调试日志
logging.basicConfig(level=logging.DEBUG)
# MRRA特定的调试标志
debug_config = {
'verbose_activity_extraction': True,
'log_llm_calls': True,
'save_intermediate_results': True,
'validate_graph_construction': True
}
# 在管道中使用
def debug_mrra_pipeline(df, llm_cfg, debug_config):
"""带全面调试的管道"""
if debug_config.get('verbose_activity_extraction'):
print("🔍 活动提取详情:")
tb = TrajectoryBatch(df)
for user_id in tb.users()[:1]: # 调试第一个用户
user_data = tb.for_user(user_id)
print(f" 用户{user_id}: {len(user_data)}个点")
print(f" 时间范围: {user_data['timestamp_local'].min()}到{user_data['timestamp_local'].max()}")
# 继续常规管道...性能分析
import time
import psutil
from functools import wraps
def profile_performance(func):
"""分析函数性能的装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
process = psutil.Process()
start_time = time.time()
start_memory = process.memory_info().rss / 1024 / 1024 # MB
result = func(*args, **kwargs)
end_time = time.time()
end_memory = process.memory_info().rss / 1024 / 1024 # MB
print(f"⏱️ {func.__name__}: {end_time - start_time:.2f}秒, 内存: {start_memory:.1f}MB → {end_memory:.1f}MB")
return result
return wrapper
# 使用性能分析
@profile_performance
def profile_activity_extraction(tb):
return ActivityExtractor(tb).extract()
@profile_performance
def profile_purpose_assignment(tb, llm, acts):
return ActivityPurposeAssigner(tb, llm=llm).assign(acts)
# 分析你的管道
acts = profile_activity_extraction(tb)
acts = profile_purpose_assignment(tb, llm, acts)获取帮助:
- 查看现有的GitHub问题寻找类似问题
- 报告错误时创建最小可重现示例
- 包含错误消息、堆栈跟踪和环境信息
- 首先使用较小的数据集测试以隔离问题