高级功能
缓存和性能
缓存管理、优化策略和推荐工作流程
缓存和性能
MRRA包含全面的缓存机制来优化性能并减少冗余计算。这在处理昂贵的LLM调用和复杂的图操作时尤其重要。
缓存架构
缓存位置和结构
MRRA使用基于轨迹批哈希的分层缓存系统:
.mrra_cache/
├── <tb_hash>/
│ ├── activities_<key>.json
│ ├── graph_<key>.gpickle
│ ├── chains_<key>.json
│ ├── patterns_<user>.json
│ └── reflection_<query_hash>.json
└── global/
├── models/
└── configs/缓存目录:.mrra_cache/目录会自动添加到.gitignore以防止缓存文件的版本控制。
轨迹批哈希
缓存系统使用基于内容的哈希来确保缓存有效性:
from mrra.persist.cache import CacheManager, compute_tb_hash
# 从轨迹数据生成缓存键
tb_hash = compute_tb_hash(tb)
print(f"轨迹批哈希: {tb_hash}")
# 缓存自动按此哈希组织
cm = CacheManager()核心缓存操作
活动缓存
带有分配目的的活动被缓存以避免昂贵的LLM重新计算:
from mrra.data.activity import ActivityExtractor
from mrra.analysis.activity_purpose import ActivityPurposeAssigner
from mrra.persist.cache import CacheManager, compute_tb_hash
# 提取并分配目的
ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
acts = ActivityExtractor(tb, **ext_cfg).extract()
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
# 缓存带目的的活动
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
cm.save_activities(tb_hash, "default", acts)
print(f"缓存了{len(acts)}个活动")# 加载缓存的活动
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
cached_acts = cm.load_activities(tb_hash, "default")
if cached_acts:
print(f"加载了{len(cached_acts)}个缓存活动")
acts = cached_acts
else:
print("未找到缓存活动,正在计算...")
# 根据需要计算活动# 为不同配置使用描述性的缓存键
def create_activity_cache_key(ext_cfg, llm_config):
"""创建包含配置参数的缓存键"""
key_parts = [
ext_cfg.get('method', 'radius'),
str(ext_cfg.get('radius_m', 300)),
str(ext_cfg.get('min_dwell_minutes', 30)),
llm_config.get('model', 'default'),
str(ext_cfg.get('max_gap_minutes', 90))
]
return "_".join(key_parts)
# 使用配置特定的缓存键
cache_key = create_activity_cache_key(ext_cfg, llm_cfg)
cm.save_activities(tb_hash, cache_key, acts)
# 后续使用相同键检索
cached_acts = cm.load_activities(tb_hash, cache_key)图缓存
移动图使用pickle序列化缓存以实现快速加载:
from mrra.graph.mobility_graph import MobilityGraph, GraphConfig
# 构庻并缓存图
cfg = GraphConfig(grid_size_m=200, min_dwell_minutes=5, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
# 缓存图
graph_key = f"mobility_grid{cfg.grid_size_m}_dwell{cfg.min_dwell_minutes}"
cm.save_graph(tb_hash, graph_key, mg.G)
# 加载缓存图
cached_graph = cm.load_graph(tb_hash, graph_key)
if cached_graph:
mg.G = cached_graph
print("加载了缓存移动图")
else:
print("正在构庻新移动图...")链和模式缓存
缓存活动链和用户模式以快速检索:
# 缓存活动链
chain_records = [] # 从活动生成
cm.save_json(tb_hash, "chains_default", {
"count": len(chain_records),
"records": chain_records[:1000] # 限制大小
}, kind="chains")
# 缓存用户模式
from mrra.graph.pattern import PatternGenerate
pat = PatternGenerate(tb)
patterns = pat.long_short_patterns(user_id)
cm.save_json(tb_hash, f"patterns_{user_id}", patterns, kind="patterns")
# 加载缓存数据
cached_chains = cm.load_json(tb_hash, "chains_default", kind="chains")
cached_patterns = cm.load_json(tb_hash, f"patterns_{user_id}", kind="patterns")高级缓存策略
分层缓存
为不同数据类型实现多缓存层:
class LayeredCacheManager:
def __init__(self, base_dir=None):
self.cache = CacheManager(base_dir)
self.memory_cache = {} # 频繁访问的内存缓存
self.cache_stats = {'hits': 0, 'misses': 0}
def get_activities(self, tb_hash, key):
"""使用内存 + 磁盘缓存获取活动"""
# 首先检查内存缓存
memory_key = f"activities_{tb_hash}_{key}"
if memory_key in self.memory_cache:
self.cache_stats['hits'] += 1
return self.memory_cache[memory_key]
# 检查磁盘缓存
disk_result = self.cache.load_activities(tb_hash, key)
if disk_result:
# 存储在内存中供未来访问
self.memory_cache[memory_key] = disk_result
self.cache_stats['hits'] += 1
return disk_result
# 缓存未命中
self.cache_stats['misses'] += 1
return None
def save_activities(self, tb_hash, key, activities):
"""同时保存到内存和磁盘"""
# 保存到磁盘
self.cache.save_activities(tb_hash, key, activities)
# 保存到内存
memory_key = f"activities_{tb_hash}_{key}"
self.memory_cache[memory_key] = activities
def get_cache_stats(self):
"""获取缓存性能统计"""
total = self.cache_stats['hits'] + self.cache_stats['misses']
hit_rate = self.cache_stats['hits'] / total if total > 0 else 0
return {
'hits': self.cache_stats['hits'],
'misses': self.cache_stats['misses'],
'hit_rate': hit_rate,
'memory_entries': len(self.memory_cache)
}推荐工作流程
最优缓存策略
遵循这个推荐工作流程以获得最佳性能:
def optimized_mrra_workflow(df, llm_cfg, force_refresh=False):
"""带全面缓存的优化MRRA工作流程"""
# 1. 初始化轨迹批和缓存
tb = TrajectoryBatch(df)
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
print(f"使用轨迹批: {tb_hash}")
# 2. 尝试加载缓存活动(最昂贵的步骤)
activities_key = "activities_with_purposes"
if not force_refresh:
acts = cm.load_activities(tb_hash, activities_key)
if acts:
print(f"✅ 加载了{len(acts)}个缓存活动和目的")
else:
print("📆 正在计算活动和目的...")
acts = compute_and_cache_activities(tb, llm_cfg, cm, tb_hash, activities_key)
else:
print("🔄 强制刷新:重新计算活动...")
acts = compute_and_cache_activities(tb, llm_cfg, cm, tb_hash, activities_key)
# 3. 尝试加载缓存图
graph_key = "mobility_graph_default"
cached_graph = cm.load_graph(tb_hash, graph_key)
if cached_graph and not force_refresh:
print("✅ 加载了缓存移动图")
cfg = GraphConfig(grid_size_m=200, min_dwell_minutes=5, use_activities=True)
mg = MobilityGraph(tb, cfg)
mg.G = cached_graph
else:
print("📆 正在构庻移动图...")
cfg = GraphConfig(grid_size_m=200, min_dwell_minutes=5, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
cm.save_graph(tb_hash, graph_key, mg.G)
print("💾 已缓存移动图")
# 4. 如果不存在,生成并缓存模式
user_id = tb.users()[0]
patterns_key = f"patterns_{user_id}"
patterns = cm.load_json(tb_hash, patterns_key, kind="patterns")
if not patterns or force_refresh:
print("📆 正在生成用户模式...")
from mrra.graph.pattern import PatternGenerate
pat = PatternGenerate(tb)
patterns = pat.long_short_patterns(user_id)
cm.save_json(tb_hash, patterns_key, patterns, kind="patterns")
print("💾 已缓存用户模式")
else:
print("✅ 加载了缓存用户模式")
# 5. 构庻检索器和智能体(这些很快)
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
reflection_cfg = dict(
max_round=1,
subAgents=[
{"name": "temporal", "prompt": "从选项中选择最可能的位置id。"},
{"name": "spatial", "prompt": "从选项中选择最可能的位置id。"},
],
aggregator="confidence_weighted_voting",
)
agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
return agent, {
'trajectory_batch': tb,
'activities': acts,
'mobility_graph': mg,
'patterns': patterns,
'cache_hash': tb_hash
}
def compute_and_cache_activities(tb, llm_cfg, cm, tb_hash, key):
"""计算带目的的活动并缓存结果"""
# 提取活动
ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30, max_gap_minutes=90)
acts = ActivityExtractor(tb, **ext_cfg).extract()
# 使用LLM分配目的
llm = make_llm(**llm_cfg)
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
# 缓存结果
cm.save_activities(tb_hash, key, acts)
print(f"💾 已缓存{len(acts)}个带目的的活动")
return acts
# 使用方法
agent, components = optimized_mrra_workflow(df, llm_cfg, force_refresh=False)
# 后续运行由于缓存将快得多
agent2, components2 = optimized_mrra_workflow(df, llm_cfg, force_refresh=False)性能监控
缓存性能指标
class CacheMonitor:
def __init__(self, cache_manager):
self.cache = cache_manager
self.metrics = {
'hits': 0,
'misses': 0,
'saves': 0,
'load_times': [],
'save_times': []
}
def time_operation(self, operation, *args, **kwargs):
"""计时缓存操作"""
import time
start_time = time.time()
result = operation(*args, **kwargs)
elapsed = time.time() - start_time
if 'load' in operation.__name__:
self.metrics['load_times'].append(elapsed)
if result:
self.metrics['hits'] += 1
else:
self.metrics['misses'] += 1
elif 'save' in operation.__name__:
self.metrics['save_times'].append(elapsed)
self.metrics['saves'] += 1
return result
def get_performance_report(self):
"""生成性能报告"""
total_ops = self.metrics['hits'] + self.metrics['misses']
hit_rate = self.metrics['hits'] / total_ops if total_ops > 0 else 0
avg_load_time = sum(self.metrics['load_times']) / len(self.metrics['load_times']) if self.metrics['load_times'] else 0
avg_save_time = sum(self.metrics['save_times']) / len(self.metrics['save_times']) if self.metrics['save_times'] else 0
return {
'cache_hit_rate': hit_rate,
'total_operations': total_ops,
'average_load_time': avg_load_time,
'average_save_time': avg_save_time,
'total_saves': self.metrics['saves']
}
# 使用方法
monitor = CacheMonitor(cm)
# 监控缓存操作
acts = monitor.time_operation(cm.load_activities, tb_hash, "default")
monitor.time_operation(cm.save_activities, tb_hash, "default", acts)
print("缓存性能:", monitor.get_performance_report())生产环境考虑:
- 监控缓存目录大小以防止磁盘空间问题
- 为长期运行的服务实施缓存清理策略
- 为频繁访问的数据考虑缓存预热策略
- 当数据模式变化时使用缓存版本控制
高性能提示:
- 尽可能使用SSD存储作为缓存目录
- 为关键数据实施缓存预热
- 为多节点部署考虑分布式缓存
- 监控缓存命中率并相应调整策略