核心概念
活动链
在智能体构建中提取、总结和运行活动链
活动链
活动链是按时间顺序排列的相邻活动转换序列,记录了位置和目的层面的移动模式。它们对以下方面至关重要:
- 序列模式挖掘(例如,"工作→用餐→工作→家")
- 预测约束和解释(限制/指导下一步候选)
- 用户画像和异常检测(不规则转换/不规则时间)
提取过程
活动链通过以下步骤提取:
- 按用户分组并按开始时间排序活动
- 生成转换记录用于相邻活动(i-1 → i),包括
from_place/to_place、from_purpose/to_purpose、时间戳等 - 聚合为"目的转换矩阵"或"位置转换统计"
实现示例
以下是如何提取和缓存活动链:
from collections import defaultdict
from mrra.persist.cache import CacheManager, compute_tb_hash
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
acts = cm.load_activities(tb_hash, "default") or acts
# 按用户分组活动
user_groups = defaultdict(list)
for a in sorted(acts, key=lambda r: (r.user_id, r.start)):
user_groups[a.user_id].append(a)
# 生成链记录
chain_records = []
for uid, seq in user_groups.items():
for i in range(1, len(seq)):
prev, cur = seq[i-1], seq[i]
chain_records.append({
"user_id": uid,
"from_place": prev.place_id,
"to_place": cur.place_id,
"from_purpose": getattr(prev, "purpose", "Other"),
"to_purpose": getattr(cur, "purpose", "Other"),
"at": str(cur.start),
})
# 缓存结果
cm.save_json(
tb_hash,
"chains_default",
{"count": len(chain_records), "records": chain_records[:1000]},
kind="chains"
)链分析
一旦有了活动链,你可以执行各种分析:
转换矩阵分析
from collections import Counter
import pandas as pd
# 目的转换分析
purpose_transitions = Counter()
location_transitions = Counter()
for record in chain_records:
purpose_transitions[(record["from_purpose"], record["to_purpose"])] += 1
location_transitions[(record["from_place"], record["to_place"])] += 1
# 转换为DataFrame进行分析
purpose_df = pd.DataFrame([
{"from": k[0], "to": k[1], "count": v}
for k, v in purpose_transitions.items()
])
print("顶级目的转换:")
print(purpose_df.sort_values("count", ascending=False).head(10))时间模式分析
import pandas as pd
# 转换时间戳用于时间分析
chain_df = pd.DataFrame(chain_records)
chain_df["timestamp"] = pd.to_datetime(chain_df["at"])
chain_df["hour"] = chain_df["timestamp"].dt.hour
chain_df["dow"] = chain_df["timestamp"].dt.dayofweek
# 分析时间模式
temporal_patterns = chain_df.groupby(["hour", "from_purpose", "to_purpose"]).size()
print("峰值转换时间:")
print(temporal_patterns.sort_values(ascending=False).head(10))与智能体构建的集成
活动链应在智能体构建期间生成和缓存,以获得最佳性能:
推荐工作流程:
- 完成"活动提取 + 目的分配"
- 立即生成和缓存活动链
- 智能体可以使用"最近目的链/位置链"作为提示或额外证据
在检索中的使用
活动链可以通过多种方式用于增强检索:
from mrra.retriever.graph_rag import GraphRAGGenerate
# 带链上下文的增强检索器
class ChainEnhancedRetriever(GraphRAGGenerate):
def __init__(self, *args, chain_records=None, **kwargs):
super().__init__(*args, **kwargs)
self.chain_records = chain_records or []
def get_recent_chains(self, user_id, purpose=None, limit=5):
"""获取最近的活动链用于上下文"""
user_chains = [
c for c in self.chain_records
if c["user_id"] == user_id
]
if purpose:
user_chains = [
c for c in user_chains
if purpose in [c["from_purpose"], c["to_purpose"]]
]
return sorted(user_chains, key=lambda x: x["at"])[-limit:]
def get_relevant_documents(self, query):
"""带链上下文的增强检索"""
docs = super().get_relevant_documents(query)
# 如果提供了user_id,添加链上下文
if "user_id" in query:
recent_chains = self.get_recent_chains(
query["user_id"],
query.get("purpose")
)
# 将链上下文添加到第一个文档
if docs and recent_chains:
chain_text = f"最近的活动转换: {recent_chains[-3:]}"
docs[0].page_content = f"{chain_text}\n\n{docs[0].page_content}"
return docs基于链的预测约束
活动链也可用于约束和验证预测:
def validate_prediction_with_chains(prediction, user_chains, current_purpose):
"""根据历史转换模式验证预测"""
# 从链中获取常见的下一个目的
next_purposes = [
c["to_purpose"] for c in user_chains
if c["from_purpose"] == current_purpose
]
if not next_purposes:
return True # 没有历史数据,允许预测
# 检查预测目的是否合理
purpose_frequency = Counter(next_purposes)
common_purposes = [p for p, count in purpose_frequency.most_common(3)]
predicted_purpose = getattr(prediction, 'purpose', None)
if predicted_purpose and predicted_purpose not in common_purposes:
# 对不寻常的转换添加置信度惩罚
confidence_penalty = 0.3
prediction.confidence *= (1 - confidence_penalty)
prediction.rationale += f" (不寻常的转换: {current_purpose}→{predicted_purpose})"
return prediction性能考虑
缓存管理:对于长期用户,活动链可能变得很大。考虑:
- 限制链记录(例如,最后1000个转换)
- 基于时间的过滤(例如,最近30天)
- 定期缓存清理
高效链存储
# 带时间窗口的高效链存储
def store_chains_with_window(chain_records, days_back=30):
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(days=days_back)
recent_chains = [
c for c in chain_records
if datetime.fromisoformat(c["at"]) > cutoff
]
return {
"count": len(recent_chains),
"records": recent_chains,
"window_days": days_back,
"cutoff": cutoff.isoformat()
}用例和应用
1. 路线推荐
基于历史转换模式使用链推荐最佳路线。
2. 异常检测
识别偏离已建立模式的不寻常活动转换。
3. 个性化预测
基于个体转换偏好加权预测。
4. 时间优化
基于历史转换时间优化时间预测。