MRRA LogoMRRA
高级功能

缓存和性能

缓存管理、优化策略和推荐工作流程

缓存和性能

MRRA包含全面的缓存机制来优化性能并减少冗余计算。这在处理昂贵的LLM调用和复杂的图操作时尤其重要。

缓存架构

缓存位置和结构

MRRA使用基于轨迹批哈希的分层缓存系统:

.mrra_cache/
├── <tb_hash>/
│   ├── activities_<key>.json
│   ├── graph_<key>.gpickle
│   ├── chains_<key>.json
│   ├── patterns_<user>.json
│   └── reflection_<query_hash>.json
└── global/
    ├── models/
    └── configs/

缓存目录.mrra_cache/目录会自动添加到.gitignore以防止缓存文件的版本控制。

轨迹批哈希

缓存系统使用基于内容的哈希来确保缓存有效性:

from mrra.persist.cache import CacheManager, compute_tb_hash

# 从轨迹数据生成缓存键
tb_hash = compute_tb_hash(tb)
print(f"轨迹批哈希: {tb_hash}")

# 缓存自动按此哈希组织
cm = CacheManager()

核心缓存操作

活动缓存

带有分配目的的活动被缓存以避免昂贵的LLM重新计算:

from mrra.data.activity import ActivityExtractor
from mrra.analysis.activity_purpose import ActivityPurposeAssigner
from mrra.persist.cache import CacheManager, compute_tb_hash

# 提取并分配目的
ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
acts = ActivityExtractor(tb, **ext_cfg).extract()
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)

# 缓存带目的的活动
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
cm.save_activities(tb_hash, "default", acts)

print(f"缓存了{len(acts)}个活动")
# 加载缓存的活动
cm = CacheManager()
tb_hash = compute_tb_hash(tb)

cached_acts = cm.load_activities(tb_hash, "default")
if cached_acts:
    print(f"加载了{len(cached_acts)}个缓存活动")
    acts = cached_acts
else:
    print("未找到缓存活动,正在计算...")
    # 根据需要计算活动
# 为不同配置使用描述性的缓存键
def create_activity_cache_key(ext_cfg, llm_config):
    """创建包含配置参数的缓存键"""
    key_parts = [
        ext_cfg.get('method', 'radius'),
        str(ext_cfg.get('radius_m', 300)),
        str(ext_cfg.get('min_dwell_minutes', 30)),
        llm_config.get('model', 'default'),
        str(ext_cfg.get('max_gap_minutes', 90))
    ]
    return "_".join(key_parts)

# 使用配置特定的缓存键
cache_key = create_activity_cache_key(ext_cfg, llm_cfg)
cm.save_activities(tb_hash, cache_key, acts)

# 后续使用相同键检索
cached_acts = cm.load_activities(tb_hash, cache_key)

图缓存

移动图使用pickle序列化缓存以实现快速加载:

from mrra.graph.mobility_graph import MobilityGraph, GraphConfig

# 构庻并缓存图
cfg = GraphConfig(grid_size_m=200, min_dwell_minutes=5, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)

# 缓存图
graph_key = f"mobility_grid{cfg.grid_size_m}_dwell{cfg.min_dwell_minutes}"
cm.save_graph(tb_hash, graph_key, mg.G)

# 加载缓存图
cached_graph = cm.load_graph(tb_hash, graph_key)
if cached_graph:
    mg.G = cached_graph
    print("加载了缓存移动图")
else:
    print("正在构庻新移动图...")

链和模式缓存

缓存活动链和用户模式以快速检索:

# 缓存活动链
chain_records = []  # 从活动生成
cm.save_json(tb_hash, "chains_default", {
    "count": len(chain_records), 
    "records": chain_records[:1000]  # 限制大小
}, kind="chains")

# 缓存用户模式  
from mrra.graph.pattern import PatternGenerate

pat = PatternGenerate(tb)
patterns = pat.long_short_patterns(user_id)
cm.save_json(tb_hash, f"patterns_{user_id}", patterns, kind="patterns")

# 加载缓存数据
cached_chains = cm.load_json(tb_hash, "chains_default", kind="chains")
cached_patterns = cm.load_json(tb_hash, f"patterns_{user_id}", kind="patterns")

高级缓存策略

分层缓存

为不同数据类型实现多缓存层:

class LayeredCacheManager:
    def __init__(self, base_dir=None):
        self.cache = CacheManager(base_dir)
        self.memory_cache = {}  # 频繁访问的内存缓存
        self.cache_stats = {'hits': 0, 'misses': 0}
    
    def get_activities(self, tb_hash, key):
        """使用内存 + 磁盘缓存获取活动"""
        
        # 首先检查内存缓存
        memory_key = f"activities_{tb_hash}_{key}"
        if memory_key in self.memory_cache:
            self.cache_stats['hits'] += 1
            return self.memory_cache[memory_key]
        
        # 检查磁盘缓存
        disk_result = self.cache.load_activities(tb_hash, key)
        if disk_result:
            # 存储在内存中供未来访问
            self.memory_cache[memory_key] = disk_result
            self.cache_stats['hits'] += 1
            return disk_result
        
        # 缓存未命中
        self.cache_stats['misses'] += 1
        return None
    
    def save_activities(self, tb_hash, key, activities):
        """同时保存到内存和磁盘"""
        
        # 保存到磁盘
        self.cache.save_activities(tb_hash, key, activities)
        
        # 保存到内存
        memory_key = f"activities_{tb_hash}_{key}"
        self.memory_cache[memory_key] = activities
    
    def get_cache_stats(self):
        """获取缓存性能统计"""
        total = self.cache_stats['hits'] + self.cache_stats['misses']
        hit_rate = self.cache_stats['hits'] / total if total > 0 else 0
        
        return {
            'hits': self.cache_stats['hits'],
            'misses': self.cache_stats['misses'],
            'hit_rate': hit_rate,
            'memory_entries': len(self.memory_cache)
        }

推荐工作流程

最优缓存策略

遵循这个推荐工作流程以获得最佳性能:

def optimized_mrra_workflow(df, llm_cfg, force_refresh=False):
    """带全面缓存的优化MRRA工作流程"""
    
    # 1. 初始化轨迹批和缓存
    tb = TrajectoryBatch(df)
    cm = CacheManager()
    tb_hash = compute_tb_hash(tb)
    
    print(f"使用轨迹批: {tb_hash}")
    
    # 2. 尝试加载缓存活动(最昂贵的步骤)
    activities_key = "activities_with_purposes"
    
    if not force_refresh:
        acts = cm.load_activities(tb_hash, activities_key)
        if acts:
            print(f"✅ 加载了{len(acts)}个缓存活动和目的")
        else:
            print("📆 正在计算活动和目的...")
            acts = compute_and_cache_activities(tb, llm_cfg, cm, tb_hash, activities_key)
    else:
        print("🔄 强制刷新:重新计算活动...")
        acts = compute_and_cache_activities(tb, llm_cfg, cm, tb_hash, activities_key)
    
    # 3. 尝试加载缓存图
    graph_key = "mobility_graph_default"
    cached_graph = cm.load_graph(tb_hash, graph_key)
    
    if cached_graph and not force_refresh:
        print("✅ 加载了缓存移动图")
        cfg = GraphConfig(grid_size_m=200, min_dwell_minutes=5, use_activities=True)
        mg = MobilityGraph(tb, cfg)
        mg.G = cached_graph
    else:
        print("📆 正在构庻移动图...")
        cfg = GraphConfig(grid_size_m=200, min_dwell_minutes=5, use_activities=True)
        mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
        cm.save_graph(tb_hash, graph_key, mg.G)
        print("💾 已缓存移动图")
    
    # 4. 如果不存在,生成并缓存模式
    user_id = tb.users()[0]
    patterns_key = f"patterns_{user_id}"
    patterns = cm.load_json(tb_hash, patterns_key, kind="patterns")
    
    if not patterns or force_refresh:
        print("📆 正在生成用户模式...")
        from mrra.graph.pattern import PatternGenerate
        pat = PatternGenerate(tb)
        patterns = pat.long_short_patterns(user_id)
        cm.save_json(tb_hash, patterns_key, patterns, kind="patterns")
        print("💾 已缓存用户模式")
    else:
        print("✅ 加载了缓存用户模式")
    
    # 5. 构庻检索器和智能体(这些很快)
    retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
    
    reflection_cfg = dict(
        max_round=1,
        subAgents=[
            {"name": "temporal", "prompt": "从选项中选择最可能的位置id。"},
            {"name": "spatial",  "prompt": "从选项中选择最可能的位置id。"},
        ],
        aggregator="confidence_weighted_voting",
    )
    
    agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
    
    return agent, {
        'trajectory_batch': tb,
        'activities': acts,
        'mobility_graph': mg,
        'patterns': patterns,
        'cache_hash': tb_hash
    }

def compute_and_cache_activities(tb, llm_cfg, cm, tb_hash, key):
    """计算带目的的活动并缓存结果"""
    
    # 提取活动
    ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30, max_gap_minutes=90)
    acts = ActivityExtractor(tb, **ext_cfg).extract()
    
    # 使用LLM分配目的
    llm = make_llm(**llm_cfg)
    acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
    
    # 缓存结果
    cm.save_activities(tb_hash, key, acts)
    print(f"💾 已缓存{len(acts)}个带目的的活动")
    
    return acts

# 使用方法
agent, components = optimized_mrra_workflow(df, llm_cfg, force_refresh=False)

# 后续运行由于缓存将快得多
agent2, components2 = optimized_mrra_workflow(df, llm_cfg, force_refresh=False)

性能监控

缓存性能指标

class CacheMonitor:
    def __init__(self, cache_manager):
        self.cache = cache_manager
        self.metrics = {
            'hits': 0,
            'misses': 0,
            'saves': 0,
            'load_times': [],
            'save_times': []
        }
    
    def time_operation(self, operation, *args, **kwargs):
        """计时缓存操作"""
        import time
        
        start_time = time.time()
        result = operation(*args, **kwargs)
        elapsed = time.time() - start_time
        
        if 'load' in operation.__name__:
            self.metrics['load_times'].append(elapsed)
            if result:
                self.metrics['hits'] += 1
            else:
                self.metrics['misses'] += 1
        elif 'save' in operation.__name__:
            self.metrics['save_times'].append(elapsed)
            self.metrics['saves'] += 1
        
        return result
    
    def get_performance_report(self):
        """生成性能报告"""
        total_ops = self.metrics['hits'] + self.metrics['misses']
        hit_rate = self.metrics['hits'] / total_ops if total_ops > 0 else 0
        
        avg_load_time = sum(self.metrics['load_times']) / len(self.metrics['load_times']) if self.metrics['load_times'] else 0
        avg_save_time = sum(self.metrics['save_times']) / len(self.metrics['save_times']) if self.metrics['save_times'] else 0
        
        return {
            'cache_hit_rate': hit_rate,
            'total_operations': total_ops,
            'average_load_time': avg_load_time,
            'average_save_time': avg_save_time,
            'total_saves': self.metrics['saves']
        }

# 使用方法
monitor = CacheMonitor(cm)

# 监控缓存操作
acts = monitor.time_operation(cm.load_activities, tb_hash, "default")
monitor.time_operation(cm.save_activities, tb_hash, "default", acts)

print("缓存性能:", monitor.get_performance_report())

生产环境考虑

  • 监控缓存目录大小以防止磁盘空间问题
  • 为长期运行的服务实施缓存清理策略
  • 为频繁访问的数据考虑缓存预热策略
  • 当数据模式变化时使用缓存版本控制

高性能提示

  • 尽可能使用SSD存储作为缓存目录
  • 为关键数据实施缓存预热
  • 为多节点部署考虑分布式缓存
  • 监控缓存命中率并相应调整策略

下一步