MRRA LogoMRRA
核心概念

活动链

在智能体构建中提取、总结和运行活动链

活动链

活动链是按时间顺序排列的相邻活动转换序列,记录了位置和目的层面的移动模式。它们对以下方面至关重要:

  • 序列模式挖掘(例如,"工作→用餐→工作→家")
  • 预测约束和解释(限制/指导下一步候选)
  • 用户画像和异常检测(不规则转换/不规则时间)

提取过程

活动链通过以下步骤提取:

  1. 按用户分组并按开始时间排序活动
  2. 生成转换记录用于相邻活动(i-1 → i),包括from_place/to_placefrom_purpose/to_purpose、时间戳等
  3. 聚合为"目的转换矩阵"或"位置转换统计"

实现示例

以下是如何提取和缓存活动链:

from collections import defaultdict
from mrra.persist.cache import CacheManager, compute_tb_hash

cm = CacheManager()
tb_hash = compute_tb_hash(tb)
acts = cm.load_activities(tb_hash, "default") or acts

# 按用户分组活动
user_groups = defaultdict(list)
for a in sorted(acts, key=lambda r: (r.user_id, r.start)):
    user_groups[a.user_id].append(a)

# 生成链记录
chain_records = []
for uid, seq in user_groups.items():
    for i in range(1, len(seq)):
        prev, cur = seq[i-1], seq[i]
        chain_records.append({
            "user_id": uid,
            "from_place": prev.place_id,
            "to_place": cur.place_id,
            "from_purpose": getattr(prev, "purpose", "Other"),
            "to_purpose": getattr(cur, "purpose", "Other"),
            "at": str(cur.start),
        })

# 缓存结果
cm.save_json(
    tb_hash, 
    "chains_default", 
    {"count": len(chain_records), "records": chain_records[:1000]}, 
    kind="chains"
)

链分析

一旦有了活动链,你可以执行各种分析:

转换矩阵分析

from collections import Counter
import pandas as pd

# 目的转换分析
purpose_transitions = Counter()
location_transitions = Counter()

for record in chain_records:
    purpose_transitions[(record["from_purpose"], record["to_purpose"])] += 1
    location_transitions[(record["from_place"], record["to_place"])] += 1

# 转换为DataFrame进行分析
purpose_df = pd.DataFrame([
    {"from": k[0], "to": k[1], "count": v} 
    for k, v in purpose_transitions.items()
])

print("顶级目的转换:")
print(purpose_df.sort_values("count", ascending=False).head(10))

时间模式分析

import pandas as pd

# 转换时间戳用于时间分析
chain_df = pd.DataFrame(chain_records)
chain_df["timestamp"] = pd.to_datetime(chain_df["at"])
chain_df["hour"] = chain_df["timestamp"].dt.hour
chain_df["dow"] = chain_df["timestamp"].dt.dayofweek

# 分析时间模式
temporal_patterns = chain_df.groupby(["hour", "from_purpose", "to_purpose"]).size()
print("峰值转换时间:")
print(temporal_patterns.sort_values(ascending=False).head(10))

与智能体构建的集成

活动链应在智能体构建期间生成和缓存,以获得最佳性能:

推荐工作流程:

  1. 完成"活动提取 + 目的分配"
  2. 立即生成和缓存活动链
  3. 智能体可以使用"最近目的链/位置链"作为提示或额外证据

在检索中的使用

活动链可以通过多种方式用于增强检索:

from mrra.retriever.graph_rag import GraphRAGGenerate

# 带链上下文的增强检索器
class ChainEnhancedRetriever(GraphRAGGenerate):
    def __init__(self, *args, chain_records=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.chain_records = chain_records or []
    
    def get_recent_chains(self, user_id, purpose=None, limit=5):
        """获取最近的活动链用于上下文"""
        user_chains = [
            c for c in self.chain_records 
            if c["user_id"] == user_id
        ]
        
        if purpose:
            user_chains = [
                c for c in user_chains 
                if purpose in [c["from_purpose"], c["to_purpose"]]
            ]
        
        return sorted(user_chains, key=lambda x: x["at"])[-limit:]
    
    def get_relevant_documents(self, query):
        """带链上下文的增强检索"""
        docs = super().get_relevant_documents(query)
        
        # 如果提供了user_id,添加链上下文
        if "user_id" in query:
            recent_chains = self.get_recent_chains(
                query["user_id"], 
                query.get("purpose")
            )
            
            # 将链上下文添加到第一个文档
            if docs and recent_chains:
                chain_text = f"最近的活动转换: {recent_chains[-3:]}"
                docs[0].page_content = f"{chain_text}\n\n{docs[0].page_content}"
        
        return docs

基于链的预测约束

活动链也可用于约束和验证预测:

def validate_prediction_with_chains(prediction, user_chains, current_purpose):
    """根据历史转换模式验证预测"""
    
    # 从链中获取常见的下一个目的
    next_purposes = [
        c["to_purpose"] for c in user_chains 
        if c["from_purpose"] == current_purpose
    ]
    
    if not next_purposes:
        return True  # 没有历史数据,允许预测
    
    # 检查预测目的是否合理
    purpose_frequency = Counter(next_purposes)
    common_purposes = [p for p, count in purpose_frequency.most_common(3)]
    
    predicted_purpose = getattr(prediction, 'purpose', None)
    
    if predicted_purpose and predicted_purpose not in common_purposes:
        # 对不寻常的转换添加置信度惩罚
        confidence_penalty = 0.3
        prediction.confidence *= (1 - confidence_penalty)
        prediction.rationale += f" (不寻常的转换: {current_purpose}{predicted_purpose})"
    
    return prediction

性能考虑

缓存管理:对于长期用户,活动链可能变得很大。考虑:

  • 限制链记录(例如,最后1000个转换)
  • 基于时间的过滤(例如,最近30天)
  • 定期缓存清理

高效链存储

# 带时间窗口的高效链存储
def store_chains_with_window(chain_records, days_back=30):
    from datetime import datetime, timedelta
    
    cutoff = datetime.now() - timedelta(days=days_back)
    
    recent_chains = [
        c for c in chain_records 
        if datetime.fromisoformat(c["at"]) > cutoff
    ]
    
    return {
        "count": len(recent_chains),
        "records": recent_chains,
        "window_days": days_back,
        "cutoff": cutoff.isoformat()
    }

用例和应用

1. 路线推荐

基于历史转换模式使用链推荐最佳路线。

2. 异常检测

识别偏离已建立模式的不寻常活动转换。

3. 个性化预测

基于个体转换偏好加权预测。

4. 时间优化

基于历史转换时间优化时间预测。

下一步