资源
示例
MRRA的实际示例和用例
示例
本节提供在各种移动性预测任务和用例中使用MRRA的全面示例。
基础示例
简单的下一位置预测
import pandas as pd
from mrra.data.trajectory import TrajectoryBatch
from mrra.data.activity import ActivityExtractor
from mrra.analysis.activity_purpose import ActivityPurposeAssigner
from mrra.graph.mobility_graph import MobilityGraph, GraphConfig
from mrra.retriever.graph_rag import GraphRAGGenerate
from mrra.agents.builder import build_mrra_agent
from mrra.agents.subagents import make_llm
# 样本轨迹数据
data = {
'user_id': ['user_1', 'user_1', 'user_1', 'user_1'],
'timestamp': ['2024-01-01 08:00:00', '2024-01-01 12:00:00', '2024-01-01 18:00:00', '2024-01-02 08:30:00'],
'latitude': [31.2304, 31.2404, 31.2304, 31.2404],
'longitude': [121.4737, 121.4837, 121.4737, 121.4837]
}
df = pd.DataFrame(data)
tb = TrajectoryBatch(df)
# 提取活动
acts = ActivityExtractor(tb, radius_m=300, min_dwell_minutes=30).extract()
# 设置LLM
llm_cfg = dict(
provider='openai',
model='gpt-4o-mini',
api_key='your-api-key',
temperature=0.2
)
llm = make_llm(**llm_cfg)
# 分配目的
acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)
# 构建移动图
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
# 创建检索器和智能体
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
reflection_cfg = dict(
max_round=1,
subAgents=[
{"name": "temporal", "prompt": "基于时间模式选择最可能的位置。"},
{"name": "spatial", "prompt": "基于空间模式选择最可能的位置。"}
],
aggregator="confidence_weighted_voting"
)
agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
# 进行预测
result = agent.invoke({
"task": "next_position",
"user_id": "user_1",
"t": "2024-01-02 12:00:00"
})
print("预测结果:", result)多用户批处理
def process_multiple_users(df, llm_cfg):
"""处理多个用户的轨迹数据"""
tb = TrajectoryBatch(df)
users = tb.users()
print(f"处理{len(users)}个用户...")
# 为所有用户提取活动
acts = ActivityExtractor(tb, radius_m=300, min_dwell_minutes=30).extract()
# 并发分配目的
llm = make_llm(**llm_cfg)
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
# 构建图
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
# 创建智能体
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
reflection_cfg = dict(
max_round=1,
subAgents=[
{"name": "temporal", "prompt": "专注于时间模式。"},
{"name": "spatial", "prompt": "专注于位置模式。"}
],
aggregator="confidence_weighted_voting"
)
agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
# 为所有用户进行预测
results = {}
for user_id in users:
user_data = tb.for_user(user_id)
last_timestamp = user_data.iloc[-1]['timestamp_local'].strftime('%Y-%m-%d %H:%M:%S')
try:
result = agent.invoke({
"task": "next_position",
"user_id": user_id,
"t": last_timestamp
})
results[user_id] = result
print(f"✅ 已预测{user_id}")
except Exception as e:
print(f"❌ {user_id}失败: {e}")
results[user_id] = None
return results, agent
# 使用方法
# results, agent = process_multiple_users(df, llm_cfg)高级示例
带领域知识的自定义目的分配
# 医疗保健移动性分析
class HealthcarePurposeAssigner(ActivityPurposeAssigner):
def create_purpose_prompt(self, activity):
"""医疗保健上下文的自定义提示"""
base_info = f"""
活动位置: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
持续时间: {activity.duration_minutes}分钟
时间: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
"""
healthcare_prompt = f"""
您正在分析医疗保健上下文中的患者移动模式。
{base_info}
常见的医疗相关目的包括:
- medical_appointment: 医生就诊、专科预约、例行检查
- pharmacy: 取药、处方配药
- emergency: 急诊科就诊、紧急护理
- therapy: 物理治疗、心理健康预约、康复
- diagnostic: 实验室检查、影像检查、医疗程序
- home: 患者住所、康复地点
- work: 工作场所(如果适用)
- caregiving: 照顾家庭成员、陪伴他人
- other: 与医疗保健无关的其他活动
基于时间、持续时间和典型医疗保健模式,
这个活动最可能的目的是什么?
只回答目的类别(例如"medical_appointment")。
"""
return healthcare_prompt
# 医疗保健数据的使用
healthcare_acts = HealthcarePurposeAssigner(tb, llm=llm, concurrency=4).assign(acts)
# 分析医疗保健模式
healthcare_purposes = [act.purpose for act in healthcare_acts]
purpose_counts = pd.Series(healthcare_purposes).value_counts()
print("医疗保健活动分布:")
print(purpose_counts)# 大学校园移动性分析
class CampusPurposeAssigner(ActivityPurposeAssigner):
def create_purpose_prompt(self, activity):
"""大学校园上下文的自定义提示"""
base_info = f"""
活动位置: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
持续时间: {activity.duration_minutes}分钟
时间: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
星期: {activity.start.strftime('%A')}
"""
campus_prompt = f"""
您正在分析大学校园中的学生/教职员工移动模式。
{base_info}
常见的校园目的包括:
- lecture: 参加课程、讲座、研讨会
- study: 图书馆、学习室、个人学习区域
- research: 实验室工作、研究活动、与导师会面
- dining: 食堂、美食广场、校园餐厅
- recreation: 健身房、体育设施、娱乐活动
- social: 学生会、社交空间、团体活动
- admin: 行政办公室、注册处、财务援助
- residence: 宿舍、校内住宿
- work: 校园就业、助教职责
- transit: 公交站、停车场、校园交通
- other: 其他校园活动
考虑典型的学术安排:
- 工作日早上:通常是讲座或研究
- 中午:用餐或社交活动
- 下午:实验室、学习或工作
- 晚上:娱乐、社交活动或住宿
- 周末:更多社交和娱乐活动
这个校园活动最可能的目的是什么?
只回答目的类别(例如"lecture")。
"""
return campus_prompt
# 专门的校园智能体配置
campus_reflection_cfg = dict(
max_round=1,
subAgents=[
{
"name": "academic_schedule",
"prompt": "专注于学术时间安排模式(课程时间、学期等)"
},
{
"name": "campus_geography",
"prompt": "专注于校园布局和设施位置"
},
{
"name": "social_patterns",
"prompt": "专注于学生社交和娱乐模式"
}
],
aggregator="confidence_weighted_voting"
)# 旅游移动性分析
class TourismPurposeAssigner(ActivityPurposeAssigner):
def create_purpose_prompt(self, activity):
"""旅游上下文的自定义提示"""
base_info = f"""
活动位置: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
持续时间: {activity.duration_minutes}分钟
时间: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
"""
tourism_prompt = f"""
您正在分析目的地城市中的游客移动模式。
{base_info}
常见的旅游目的包括:
- accommodation: 酒店、青年旅社、度假租赁入住/住宿
- sightseeing: 旅游景点、地标、观景点
- cultural: 博物馆、画廊、文化遗址、历史地点
- dining: 餐厅、当地美食、美食之旅
- shopping: 纪念品商店、当地市场、购物区
- entertainment: 表演、音乐会、夜生活、酒吧
- recreation: 公园、海滩、户外活动
- transportation: 机场、火车站、公交站、出租车站
- services: 旅游信息、货币兑换、医疗
- business: 会议、大会(商务旅行者)
- other: 其他旅游活动
考虑典型的旅游行为:
- 早上:通常是观光或文化活动
- 中午:用餐或购物
- 下午:更多观光或娱乐
- 晚上:用餐、娱乐或住宿
- 长时间(2小时以上):主要景点或住宿
- 短时间(<30分钟):服务或交通
这个旅游活动最可能的目的是什么?
只回答目的类别(例如"sightseeing")。
"""
return tourism_prompt
# 带本地上下文的旅游特定智能体
tourism_reflection_cfg = dict(
max_round=1,
subAgents=[
{
"name": "attraction_expert",
"prompt": "专注于旅游景点和热门目的地",
"mcp": {
"maps": {} # 为景点使用POI信息
}
},
{
"name": "temporal_expert",
"prompt": "专注于旅游时间模式和季节性行为"
},
{
"name": "logistics_expert",
"prompt": "专注于旅游后勤(住宿、交通、服务)"
}
],
aggregator="confidence_weighted_voting"
)实时流式预测
import asyncio
from datetime import datetime, timedelta
class StreamingMobilityPredictor:
"""实时流式移动性预测系统"""
def __init__(self, base_agent, update_interval_minutes=30):
self.base_agent = base_agent
self.update_interval = update_interval_minutes
self.last_update = {}
self.prediction_cache = {}
async def start_streaming(self, user_ids, prediction_callback):
"""为多个用户开始流式预测"""
print(f"为{len(user_ids)}个用户开始流式预测...")
while True:
tasks = []
for user_id in user_ids:
tasks.append(self._update_user_prediction(user_id, prediction_callback))
# 并发处理所有用户
await asyncio.gather(*tasks)
# 等待下次更新
await asyncio.sleep(self.update_interval * 60)
async def _update_user_prediction(self, user_id, callback):
"""为单个用户更新预测"""
try:
current_time = datetime.now()
# 生成预测
result = await self._predict_async(user_id, current_time)
# 缓存结果
self.prediction_cache[user_id] = {
'prediction': result,
'timestamp': current_time,
'next_update': current_time + timedelta(minutes=self.update_interval)
}
# 调用回调处理结果
if callback:
await callback(user_id, result, current_time)
except Exception as e:
print(f"❌ 更新{user_id}的预测失败: {e}")
async def _predict_async(self, user_id, current_time):
"""预测的异步包装器"""
loop = asyncio.get_event_loop()
# 在线程池中运行预测以避免阻塞
result = await loop.run_in_executor(
None,
self._make_prediction,
user_id,
current_time.strftime('%Y-%m-%d %H:%M:%S')
)
return result
def _make_prediction(self, user_id, timestamp_str):
"""进行实际预测"""
return self.base_agent.invoke({
"task": "next_position",
"user_id": user_id,
"t": timestamp_str
})
# 使用示例
async def prediction_callback(user_id, result, timestamp):
"""处理新预测的回调"""
print(f"📍 {timestamp.strftime('%H:%M:%S')} - {user_id}: {result.get('predicted_location')}")
# 这里你可以:
# - 发送到实时仪表板
# - 存储到数据库
# - 触发通知
# - 更新移动应用
async def run_streaming_example():
# 设置基础智能体(与前面的示例相同)
# agent = build_mrra_agent(...)
# 创建流式预测器
# streaming_predictor = StreamingMobilityPredictor(agent, update_interval_minutes=15)
# 为用户开始流式处理
# user_ids = ['user_1', 'user_2', 'user_3']
# await streaming_predictor.start_streaming(user_ids, prediction_callback)
pass
# 运行流式预测
# asyncio.run(run_streaming_example())行业特定示例
智慧城市交通管理
class TrafficManagementSystem:
"""使用MRRA预测的智慧城市交通管理"""
def __init__(self, mrra_agent):
self.agent = mrra_agent
self.congestion_threshold = 0.7
self.prediction_horizon_minutes = 30
def predict_traffic_hotspots(self, user_data, current_time):
"""预测交通拥堵热点"""
# 获取所有用户的预测
future_time = current_time + timedelta(minutes=self.prediction_horizon_minutes)
predicted_locations = []
for user_id in user_data.keys():
try:
result = self.agent.invoke({
"task": "next_position",
"user_id": user_id,
"t": future_time.strftime('%Y-%m-%d %H:%M:%S')
})
location = result.get('predicted_location')
if location:
predicted_locations.append(location)
except Exception as e:
print(f"{user_id}的预测失败: {e}")
# 计算每个位置的预测数量
from collections import Counter
location_counts = Counter(predicted_locations)
# 识别热点
total_users = len(user_data)
hotspots = []
for location, count in location_counts.items():
congestion_level = count / total_users
if congestion_level > self.congestion_threshold:
hotspots.append({
'location': location,
'predicted_users': count,
'congestion_level': congestion_level,
'severity': 'high' if congestion_level > 0.8 else 'medium'
})
return sorted(hotspots, key=lambda x: x['congestion_level'], reverse=True)
def generate_traffic_recommendations(self, hotspots):
"""生成交通管理建议"""
recommendations = []
for hotspot in hotspots:
location = hotspot['location']
severity = hotspot['severity']
if severity == 'high':
recommendations.append({
'location': location,
'action': 'increase_signal_timing',
'description': f'在{location}增加绿灯时长',
'priority': 'high'
})
recommendations.append({
'location': location,
'action': 'deploy_traffic_control',
'description': f'在{location}部署交通警察',
'priority': 'high'
})
elif severity == 'medium':
recommendations.append({
'location': location,
'action': 'alternative_routes',
'description': f'建议绕过{location}的替代路线',
'priority': 'medium'
})
return recommendations
# 使用
# traffic_system = TrafficManagementSystem(agent)
# hotspots = traffic_system.predict_traffic_hotspots(user_data, datetime.now())
# recommendations = traffic_system.generate_traffic_recommendations(hotspots)示例数据源:
- Geolife数据集:带GPS轨迹的研究数据集
- Foursquare签到:基于位置的社交网络数据
- CDR数据:来自电信运营商的通话详单记录
- 公交数据:公共交通GPS轨迹
- 合成数据:用于测试的生成移动性数据
运行示例:
- 用实际API密钥替换
'your-api-key' - 确保您有适当的数据访问权限
- 先从小数据集开始测试
- 开发期间监控API使用和成本
- 广泛使用缓存避免重复LLM调用