MRRA LogoMRRA
资源

示例

MRRA的实际示例和用例

示例

本节提供在各种移动性预测任务和用例中使用MRRA的全面示例。

基础示例

简单的下一位置预测

import pandas as pd
from mrra.data.trajectory import TrajectoryBatch
from mrra.data.activity import ActivityExtractor
from mrra.analysis.activity_purpose import ActivityPurposeAssigner
from mrra.graph.mobility_graph import MobilityGraph, GraphConfig
from mrra.retriever.graph_rag import GraphRAGGenerate
from mrra.agents.builder import build_mrra_agent
from mrra.agents.subagents import make_llm

# 样本轨迹数据
data = {
    'user_id': ['user_1', 'user_1', 'user_1', 'user_1'],
    'timestamp': ['2024-01-01 08:00:00', '2024-01-01 12:00:00', '2024-01-01 18:00:00', '2024-01-02 08:30:00'],
    'latitude': [31.2304, 31.2404, 31.2304, 31.2404], 
    'longitude': [121.4737, 121.4837, 121.4737, 121.4837]
}

df = pd.DataFrame(data)
tb = TrajectoryBatch(df)

# 提取活动
acts = ActivityExtractor(tb, radius_m=300, min_dwell_minutes=30).extract()

# 设置LLM
llm_cfg = dict(
    provider='openai',
    model='gpt-4o-mini',
    api_key='your-api-key',
    temperature=0.2
)
llm = make_llm(**llm_cfg)

# 分配目的
acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)

# 构建移动图
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)

# 创建检索器和智能体
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)

reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {"name": "temporal", "prompt": "基于时间模式选择最可能的位置。"},
        {"name": "spatial", "prompt": "基于空间模式选择最可能的位置。"}
    ],
    aggregator="confidence_weighted_voting"
)

agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)

# 进行预测
result = agent.invoke({
    "task": "next_position", 
    "user_id": "user_1",
    "t": "2024-01-02 12:00:00"
})

print("预测结果:", result)

多用户批处理

def process_multiple_users(df, llm_cfg):
    """处理多个用户的轨迹数据"""
    
    tb = TrajectoryBatch(df)
    users = tb.users()
    
    print(f"处理{len(users)}个用户...")
    
    # 为所有用户提取活动
    acts = ActivityExtractor(tb, radius_m=300, min_dwell_minutes=30).extract()
    
    # 并发分配目的
    llm = make_llm(**llm_cfg)
    acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
    
    # 构建图
    cfg = GraphConfig(grid_size_m=200, use_activities=True)
    mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
    
    # 创建智能体
    retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
    
    reflection_cfg = dict(
        max_round=1,
        subAgents=[
            {"name": "temporal", "prompt": "专注于时间模式。"},
            {"name": "spatial", "prompt": "专注于位置模式。"}
        ],
        aggregator="confidence_weighted_voting"
    )
    
    agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
    
    # 为所有用户进行预测
    results = {}
    
    for user_id in users:
        user_data = tb.for_user(user_id)
        last_timestamp = user_data.iloc[-1]['timestamp_local'].strftime('%Y-%m-%d %H:%M:%S')
        
        try:
            result = agent.invoke({
                "task": "next_position",
                "user_id": user_id,
                "t": last_timestamp
            })
            results[user_id] = result
            print(f"✅ 已预测{user_id}")
            
        except Exception as e:
            print(f"❌ {user_id}失败: {e}")
            results[user_id] = None
    
    return results, agent

# 使用方法
# results, agent = process_multiple_users(df, llm_cfg)

高级示例

带领域知识的自定义目的分配

# 医疗保健移动性分析
class HealthcarePurposeAssigner(ActivityPurposeAssigner):
    def create_purpose_prompt(self, activity):
        """医疗保健上下文的自定义提示"""
        
        base_info = f"""
        活动位置: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
        持续时间: {activity.duration_minutes}分钟
        时间: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
        """
        
        healthcare_prompt = f"""
        您正在分析医疗保健上下文中的患者移动模式。
        
        {base_info}
        
        常见的医疗相关目的包括:
        - medical_appointment: 医生就诊、专科预约、例行检查
        - pharmacy: 取药、处方配药
        - emergency: 急诊科就诊、紧急护理
        - therapy: 物理治疗、心理健康预约、康复
        - diagnostic: 实验室检查、影像检查、医疗程序
        - home: 患者住所、康复地点
        - work: 工作场所(如果适用)
        - caregiving: 照顾家庭成员、陪伴他人
        - other: 与医疗保健无关的其他活动
        
        基于时间、持续时间和典型医疗保健模式,
        这个活动最可能的目的是什么?
        
        只回答目的类别(例如"medical_appointment")。
        """
        
        return healthcare_prompt

# 医疗保健数据的使用
healthcare_acts = HealthcarePurposeAssigner(tb, llm=llm, concurrency=4).assign(acts)

# 分析医疗保健模式
healthcare_purposes = [act.purpose for act in healthcare_acts]
purpose_counts = pd.Series(healthcare_purposes).value_counts()
print("医疗保健活动分布:")
print(purpose_counts)
# 大学校园移动性分析
class CampusPurposeAssigner(ActivityPurposeAssigner):
    def create_purpose_prompt(self, activity):
        """大学校园上下文的自定义提示"""
        
        base_info = f"""
        活动位置: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
        持续时间: {activity.duration_minutes}分钟
        时间: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
        星期: {activity.start.strftime('%A')}
        """
        
        campus_prompt = f"""
        您正在分析大学校园中的学生/教职员工移动模式。
        
        {base_info}
        
        常见的校园目的包括:
        - lecture: 参加课程、讲座、研讨会
        - study: 图书馆、学习室、个人学习区域
        - research: 实验室工作、研究活动、与导师会面
        - dining: 食堂、美食广场、校园餐厅
        - recreation: 健身房、体育设施、娱乐活动
        - social: 学生会、社交空间、团体活动
        - admin: 行政办公室、注册处、财务援助
        - residence: 宿舍、校内住宿
        - work: 校园就业、助教职责
        - transit: 公交站、停车场、校园交通
        - other: 其他校园活动
        
        考虑典型的学术安排:
        - 工作日早上:通常是讲座或研究
        - 中午:用餐或社交活动
        - 下午:实验室、学习或工作
        - 晚上:娱乐、社交活动或住宿
        - 周末:更多社交和娱乐活动
        
        这个校园活动最可能的目的是什么?
        
        只回答目的类别(例如"lecture")。
        """
        
        return campus_prompt

# 专门的校园智能体配置
campus_reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "academic_schedule", 
            "prompt": "专注于学术时间安排模式(课程时间、学期等)"
        },
        {
            "name": "campus_geography",
            "prompt": "专注于校园布局和设施位置"
        },
        {
            "name": "social_patterns",
            "prompt": "专注于学生社交和娱乐模式"
        }
    ],
    aggregator="confidence_weighted_voting"
)
# 旅游移动性分析
class TourismPurposeAssigner(ActivityPurposeAssigner):
    def create_purpose_prompt(self, activity):
        """旅游上下文的自定义提示"""
        
        base_info = f"""
        活动位置: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
        持续时间: {activity.duration_minutes}分钟
        时间: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
        """
        
        tourism_prompt = f"""
        您正在分析目的地城市中的游客移动模式。
        
        {base_info}
        
        常见的旅游目的包括:
        - accommodation: 酒店、青年旅社、度假租赁入住/住宿
        - sightseeing: 旅游景点、地标、观景点
        - cultural: 博物馆、画廊、文化遗址、历史地点
        - dining: 餐厅、当地美食、美食之旅
        - shopping: 纪念品商店、当地市场、购物区
        - entertainment: 表演、音乐会、夜生活、酒吧
        - recreation: 公园、海滩、户外活动
        - transportation: 机场、火车站、公交站、出租车站
        - services: 旅游信息、货币兑换、医疗
        - business: 会议、大会(商务旅行者)
        - other: 其他旅游活动
        
        考虑典型的旅游行为:
        - 早上:通常是观光或文化活动
        - 中午:用餐或购物
        - 下午:更多观光或娱乐
        - 晚上:用餐、娱乐或住宿
        - 长时间(2小时以上):主要景点或住宿
        - 短时间(<30分钟):服务或交通
        
        这个旅游活动最可能的目的是什么?
        
        只回答目的类别(例如"sightseeing")。
        """
        
        return tourism_prompt

# 带本地上下文的旅游特定智能体
tourism_reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "attraction_expert",
            "prompt": "专注于旅游景点和热门目的地",
            "mcp": {
                "maps": {}  # 为景点使用POI信息
            }
        },
        {
            "name": "temporal_expert",
            "prompt": "专注于旅游时间模式和季节性行为"
        },
        {
            "name": "logistics_expert", 
            "prompt": "专注于旅游后勤(住宿、交通、服务)"
        }
    ],
    aggregator="confidence_weighted_voting"
)

实时流式预测

import asyncio
from datetime import datetime, timedelta

class StreamingMobilityPredictor:
    """实时流式移动性预测系统"""
    
    def __init__(self, base_agent, update_interval_minutes=30):
        self.base_agent = base_agent
        self.update_interval = update_interval_minutes
        self.last_update = {}
        self.prediction_cache = {}
        
    async def start_streaming(self, user_ids, prediction_callback):
        """为多个用户开始流式预测"""
        
        print(f"为{len(user_ids)}个用户开始流式预测...")
        
        while True:
            tasks = []
            
            for user_id in user_ids:
                tasks.append(self._update_user_prediction(user_id, prediction_callback))
            
            # 并发处理所有用户
            await asyncio.gather(*tasks)
            
            # 等待下次更新
            await asyncio.sleep(self.update_interval * 60)
    
    async def _update_user_prediction(self, user_id, callback):
        """为单个用户更新预测"""
        
        try:
            current_time = datetime.now()
            
            # 生成预测
            result = await self._predict_async(user_id, current_time)
            
            # 缓存结果
            self.prediction_cache[user_id] = {
                'prediction': result,
                'timestamp': current_time,
                'next_update': current_time + timedelta(minutes=self.update_interval)
            }
            
            # 调用回调处理结果
            if callback:
                await callback(user_id, result, current_time)
                
        except Exception as e:
            print(f"❌ 更新{user_id}的预测失败: {e}")
    
    async def _predict_async(self, user_id, current_time):
        """预测的异步包装器"""
        
        loop = asyncio.get_event_loop()
        
        # 在线程池中运行预测以避免阻塞
        result = await loop.run_in_executor(
            None, 
            self._make_prediction, 
            user_id, 
            current_time.strftime('%Y-%m-%d %H:%M:%S')
        )
        
        return result
    
    def _make_prediction(self, user_id, timestamp_str):
        """进行实际预测"""
        
        return self.base_agent.invoke({
            "task": "next_position",
            "user_id": user_id,
            "t": timestamp_str
        })

# 使用示例
async def prediction_callback(user_id, result, timestamp):
    """处理新预测的回调"""
    print(f"📍 {timestamp.strftime('%H:%M:%S')} - {user_id}: {result.get('predicted_location')}")
    
    # 这里你可以:
    # - 发送到实时仪表板
    # - 存储到数据库
    # - 触发通知
    # - 更新移动应用

async def run_streaming_example():
    # 设置基础智能体(与前面的示例相同)
    # agent = build_mrra_agent(...)
    
    # 创建流式预测器
    # streaming_predictor = StreamingMobilityPredictor(agent, update_interval_minutes=15)
    
    # 为用户开始流式处理
    # user_ids = ['user_1', 'user_2', 'user_3']
    # await streaming_predictor.start_streaming(user_ids, prediction_callback)
    
    pass

# 运行流式预测
# asyncio.run(run_streaming_example())

行业特定示例

智慧城市交通管理

class TrafficManagementSystem:
    """使用MRRA预测的智慧城市交通管理"""
    
    def __init__(self, mrra_agent):
        self.agent = mrra_agent
        self.congestion_threshold = 0.7
        self.prediction_horizon_minutes = 30
        
    def predict_traffic_hotspots(self, user_data, current_time):
        """预测交通拥堵热点"""
        
        # 获取所有用户的预测
        future_time = current_time + timedelta(minutes=self.prediction_horizon_minutes)
        
        predicted_locations = []
        
        for user_id in user_data.keys():
            try:
                result = self.agent.invoke({
                    "task": "next_position",
                    "user_id": user_id,
                    "t": future_time.strftime('%Y-%m-%d %H:%M:%S')
                })
                
                location = result.get('predicted_location')
                if location:
                    predicted_locations.append(location)
                    
            except Exception as e:
                print(f"{user_id}的预测失败: {e}")
        
        # 计算每个位置的预测数量
        from collections import Counter
        location_counts = Counter(predicted_locations)
        
        # 识别热点
        total_users = len(user_data)
        hotspots = []
        
        for location, count in location_counts.items():
            congestion_level = count / total_users
            
            if congestion_level > self.congestion_threshold:
                hotspots.append({
                    'location': location,
                    'predicted_users': count,
                    'congestion_level': congestion_level,
                    'severity': 'high' if congestion_level > 0.8 else 'medium'
                })
        
        return sorted(hotspots, key=lambda x: x['congestion_level'], reverse=True)
    
    def generate_traffic_recommendations(self, hotspots):
        """生成交通管理建议"""
        
        recommendations = []
        
        for hotspot in hotspots:
            location = hotspot['location']
            severity = hotspot['severity']
            
            if severity == 'high':
                recommendations.append({
                    'location': location,
                    'action': 'increase_signal_timing',
                    'description': f'在{location}增加绿灯时长',
                    'priority': 'high'
                })
                
                recommendations.append({
                    'location': location,
                    'action': 'deploy_traffic_control',
                    'description': f'在{location}部署交通警察',
                    'priority': 'high'
                })
            
            elif severity == 'medium':
                recommendations.append({
                    'location': location,
                    'action': 'alternative_routes',
                    'description': f'建议绕过{location}的替代路线',
                    'priority': 'medium'
                })
        
        return recommendations

# 使用
# traffic_system = TrafficManagementSystem(agent)
# hotspots = traffic_system.predict_traffic_hotspots(user_data, datetime.now())
# recommendations = traffic_system.generate_traffic_recommendations(hotspots)

示例数据源

  • Geolife数据集:带GPS轨迹的研究数据集
  • Foursquare签到:基于位置的社交网络数据
  • CDR数据:来自电信运营商的通话详单记录
  • 公交数据:公共交通GPS轨迹
  • 合成数据:用于测试的生成移动性数据

运行示例

  • 用实际API密钥替换 'your-api-key'
  • 确保您有适当的数据访问权限
  • 先从小数据集开始测试
  • 开发期间监控API使用和成本
  • 广泛使用缓存避免重复LLM调用

下一步