MRRA LogoMRRA
Resources

Examples

Real-world examples and use cases for MRRA

Examples

This section provides comprehensive examples of using MRRA for various mobility prediction tasks and use cases.

Basic Examples

Simple Next Location Prediction

import pandas as pd
from mrra.data.trajectory import TrajectoryBatch
from mrra.data.activity import ActivityExtractor
from mrra.analysis.activity_purpose import ActivityPurposeAssigner
from mrra.graph.mobility_graph import MobilityGraph, GraphConfig
from mrra.retriever.graph_rag import GraphRAGGenerate
from mrra.agents.builder import build_mrra_agent
from mrra.agents.subagents import make_llm

# Sample trajectory data
data = {
    'user_id': ['user_1', 'user_1', 'user_1', 'user_1'],
    'timestamp': ['2024-01-01 08:00:00', '2024-01-01 12:00:00', '2024-01-01 18:00:00', '2024-01-02 08:30:00'],
    'latitude': [31.2304, 31.2404, 31.2304, 31.2404], 
    'longitude': [121.4737, 121.4837, 121.4737, 121.4837]
}

df = pd.DataFrame(data)
tb = TrajectoryBatch(df)

# Extract activities
acts = ActivityExtractor(tb, radius_m=300, min_dwell_minutes=30).extract()

# Set up LLM
llm_cfg = dict(
    provider='openai',
    model='gpt-4o-mini',
    api_key='your-api-key',
    temperature=0.2
)
llm = make_llm(**llm_cfg)

# Assign purposes
acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)

# Build mobility graph
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)

# Create retriever and agent
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)

reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {"name": "temporal", "prompt": "Select most likely location based on time patterns."},
        {"name": "spatial", "prompt": "Select most likely location based on spatial patterns."}
    ],
    aggregator="confidence_weighted_voting"
)

agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)

# Make prediction
result = agent.invoke({
    "task": "next_position", 
    "user_id": "user_1",
    "t": "2024-01-02 12:00:00"
})

print("Prediction result:", result)

Batch Processing Multiple Users

def process_multiple_users(df, llm_cfg):
    """Process trajectory data for multiple users"""
    
    tb = TrajectoryBatch(df)
    users = tb.users()
    
    print(f"Processing {len(users)} users...")
    
    # Extract activities for all users
    acts = ActivityExtractor(tb, radius_m=300, min_dwell_minutes=30).extract()
    
    # Assign purposes with concurrency
    llm = make_llm(**llm_cfg)
    acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
    
    # Build graph
    cfg = GraphConfig(grid_size_m=200, use_activities=True)
    mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
    
    # Create agent
    retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
    
    reflection_cfg = dict(
        max_round=1,
        subAgents=[
            {"name": "temporal", "prompt": "Focus on time patterns."},
            {"name": "spatial", "prompt": "Focus on location patterns."}
        ],
        aggregator="confidence_weighted_voting"
    )
    
    agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
    
    # Make predictions for all users
    results = {}
    
    for user_id in users:
        user_data = tb.for_user(user_id)
        last_timestamp = user_data.iloc[-1]['timestamp_local'].strftime('%Y-%m-%d %H:%M:%S')
        
        try:
            result = agent.invoke({
                "task": "next_position",
                "user_id": user_id,
                "t": last_timestamp
            })
            results[user_id] = result
            print(f"✅ Predicted for {user_id}")
            
        except Exception as e:
            print(f"❌ Failed for {user_id}: {e}")
            results[user_id] = None
    
    return results, agent

# Usage
# results, agent = process_multiple_users(df, llm_cfg)

Advanced Examples

Custom Purpose Assignment with Domain Knowledge

# Healthcare mobility analysis
class HealthcarePurposeAssigner(ActivityPurposeAssigner):
    def create_purpose_prompt(self, activity):
        """Custom prompt for healthcare context"""
        
        base_info = f"""
        Activity Location: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
        Duration: {activity.duration_minutes} minutes
        Time: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
        """
        
        healthcare_prompt = f"""
        You are analyzing patient mobility patterns in a healthcare context.
        
        {base_info}
        
        Common healthcare-related purposes include:
        - medical_appointment: Doctor visits, specialist appointments, routine checkups
        - pharmacy: Medication pickup, prescription fulfillment
        - emergency: Emergency room visits, urgent care
        - therapy: Physical therapy, mental health appointments, rehabilitation
        - diagnostic: Lab tests, imaging, medical procedures
        - home: Patient residence, recovery location
        - work: Employment location (if applicable)
        - caregiving: Caring for family members, accompanying others
        - other: Other activities not related to healthcare
        
        Based on the time, duration, and typical healthcare patterns, 
        what is the most likely purpose of this activity?
        
        Respond with only the purpose category (e.g., "medical_appointment").
        """
        
        return healthcare_prompt

# Usage for healthcare data
healthcare_acts = HealthcarePurposeAssigner(tb, llm=llm, concurrency=4).assign(acts)

# Analyze healthcare patterns
healthcare_purposes = [act.purpose for act in healthcare_acts]
purpose_counts = pd.Series(healthcare_purposes).value_counts()
print("Healthcare activity distribution:")
print(purpose_counts)
# University campus mobility analysis
class CampusPurposeAssigner(ActivityPurposeAssigner):
    def create_purpose_prompt(self, activity):
        """Custom prompt for university campus context"""
        
        base_info = f"""
        Activity Location: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
        Duration: {activity.duration_minutes} minutes
        Time: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
        Day of Week: {activity.start.strftime('%A')}
        """
        
        campus_prompt = f"""
        You are analyzing student/faculty mobility patterns on a university campus.
        
        {base_info}
        
        Common campus purposes include:
        - lecture: Attending classes, lectures, seminars
        - study: Library, study rooms, individual study areas
        - research: Lab work, research activities, meetings with advisors
        - dining: Cafeteria, food courts, campus restaurants
        - recreation: Gym, sports facilities, recreational activities
        - social: Student union, social spaces, group activities
        - admin: Administrative offices, registrar, financial aid
        - residence: Dormitory, on-campus housing
        - work: Campus employment, teaching assistant duties
        - transit: Bus stops, parking areas, campus transportation
        - other: Other campus activities
        
        Consider typical academic schedules:
        - Weekday mornings: Often lectures or research
        - Midday: Dining or social activities
        - Afternoons: Labs, study sessions, or work
        - Evenings: Recreation, social activities, or residence
        - Weekends: More social and recreational activities
        
        What is the most likely purpose of this campus activity?
        
        Respond with only the purpose category (e.g., "lecture").
        """
        
        return campus_prompt

# Specialized campus agent configuration
campus_reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "academic_schedule", 
            "prompt": "Focus on academic schedule patterns (class times, semesters, etc.)"
        },
        {
            "name": "campus_geography",
            "prompt": "Focus on campus layout and facility locations"
        },
        {
            "name": "social_patterns",
            "prompt": "Focus on student social and recreational patterns"
        }
    ],
    aggregator="confidence_weighted_voting"
)
# Tourism mobility analysis
class TourismPurposeAssigner(ActivityPurposeAssigner):
    def create_purpose_prompt(self, activity):
        """Custom prompt for tourism context"""
        
        base_info = f"""
        Activity Location: ({activity.center_lat:.6f}, {activity.center_lon:.6f})
        Duration: {activity.duration_minutes} minutes
        Time: {activity.start.strftime('%A %H:%M')} - {activity.end.strftime('%H:%M')}
        """
        
        tourism_prompt = f"""
        You are analyzing tourist mobility patterns in a destination city.
        
        {base_info}
        
        Common tourist purposes include:
        - accommodation: Hotel, hostel, vacation rental check-in/stay
        - sightseeing: Tourist attractions, landmarks, viewpoints
        - cultural: Museums, galleries, cultural sites, historical places
        - dining: Restaurants, local cuisine, food tours
        - shopping: Souvenir shops, local markets, shopping districts
        - entertainment: Shows, concerts, nightlife, bars
        - recreation: Parks, beaches, outdoor activities
        - transportation: Airport, train station, bus stops, taxi stands
        - services: Tourist information, currency exchange, medical
        - business: Meetings, conferences (for business travelers)
        - other: Other tourist activities
        
        Consider typical tourist behavior:
        - Mornings: Often sightseeing or cultural activities
        - Midday: Dining or shopping
        - Afternoons: More sightseeing or recreation
        - Evenings: Dining, entertainment, or accommodation
        - Longer durations (2+ hours): Major attractions or accommodation
        - Shorter durations (<30 min): Services or transportation
        
        What is the most likely purpose of this tourist activity?
        
        Respond with only the purpose category (e.g., "sightseeing").
        """
        
        return tourism_prompt

# Tourism-specific agent with local context
tourism_reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "attraction_expert",
            "prompt": "Focus on tourist attractions and popular destinations",
            "mcp": {
                "maps": {}  # Use POI information for attractions
            }
        },
        {
            "name": "temporal_expert",
            "prompt": "Focus on tourism timing patterns and seasonal behavior"
        },
        {
            "name": "logistics_expert", 
            "prompt": "Focus on tourist logistics (accommodation, transportation, services)"
        }
    ],
    aggregator="confidence_weighted_voting"
)

Real-time Streaming Prediction

import asyncio
from datetime import datetime, timedelta

class StreamingMobilityPredictor:
    """Real-time streaming mobility prediction system"""
    
    def __init__(self, base_agent, update_interval_minutes=30):
        self.base_agent = base_agent
        self.update_interval = update_interval_minutes
        self.last_update = {}
        self.prediction_cache = {}
        
    async def start_streaming(self, user_ids, prediction_callback):
        """Start streaming predictions for multiple users"""
        
        print(f"Starting streaming predictions for {len(user_ids)} users...")
        
        while True:
            tasks = []
            
            for user_id in user_ids:
                tasks.append(self._update_user_prediction(user_id, prediction_callback))
            
            # Process all users concurrently
            await asyncio.gather(*tasks)
            
            # Wait before next update
            await asyncio.sleep(self.update_interval * 60)
    
    async def _update_user_prediction(self, user_id, callback):
        """Update prediction for a single user"""
        
        try:
            current_time = datetime.now()
            
            # Generate prediction
            result = await self._predict_async(user_id, current_time)
            
            # Cache result
            self.prediction_cache[user_id] = {
                'prediction': result,
                'timestamp': current_time,
                'next_update': current_time + timedelta(minutes=self.update_interval)
            }
            
            # Call callback with result
            if callback:
                await callback(user_id, result, current_time)
                
        except Exception as e:
            print(f"❌ Failed to update prediction for {user_id}: {e}")
    
    async def _predict_async(self, user_id, current_time):
        """Async wrapper for prediction"""
        
        loop = asyncio.get_event_loop()
        
        # Run prediction in thread pool to avoid blocking
        result = await loop.run_in_executor(
            None, 
            self._make_prediction, 
            user_id, 
            current_time.strftime('%Y-%m-%d %H:%M:%S')
        )
        
        return result
    
    def _make_prediction(self, user_id, timestamp_str):
        """Make actual prediction"""
        
        return self.base_agent.invoke({
            "task": "next_position",
            "user_id": user_id,
            "t": timestamp_str
        })
    
    def get_latest_prediction(self, user_id):
        """Get latest cached prediction for user"""
        return self.prediction_cache.get(user_id)

# Usage example
async def prediction_callback(user_id, result, timestamp):
    """Callback for handling new predictions"""
    print(f"📍 {timestamp.strftime('%H:%M:%S')} - {user_id}: {result.get('predicted_location')}")
    
    # Here you could:
    # - Send to real-time dashboard
    # - Store in database
    # - Trigger notifications
    # - Update mobile app

async def run_streaming_example():
    # Set up base agent (same as previous examples)
    # agent = build_mrra_agent(...)
    
    # Create streaming predictor
    # streaming_predictor = StreamingMobilityPredictor(agent, update_interval_minutes=15)
    
    # Start streaming for users
    # user_ids = ['user_1', 'user_2', 'user_3']
    # await streaming_predictor.start_streaming(user_ids, prediction_callback)
    
    pass

# Run streaming predictions
# asyncio.run(run_streaming_example())

Evaluation and Benchmarking

from sklearn.metrics import accuracy_score, classification_report
import numpy as np
from datetime import datetime, timedelta

class MobilityPredictor Evaluator:
    """Comprehensive evaluation system for MRRA predictions"""
    
    def __init__(self, agent, test_data):
        self.agent = agent
        self.test_data = test_data
        self.predictions = []
        self.ground_truth = []
        
    def evaluate_next_position_accuracy(self, test_users=None, time_horizons=[1, 2, 4, 8]):
        """Evaluate next position prediction accuracy"""
        
        tb = TrajectoryBatch(self.test_data)
        users = test_users or tb.users()
        
        results = {}
        
        for horizon_hours in time_horizons:
            print(f"Evaluating {horizon_hours}h prediction horizon...")
            
            predictions = []
            ground_truth = []
            
            for user_id in users:
                user_data = tb.for_user(user_id).sort_values('timestamp_local')
                
                # Use first 80% for prediction, last 20% for testing
                split_idx = int(len(user_data) * 0.8)
                
                for i in range(split_idx, len(user_data) - 1):
                    current_time = user_data.iloc[i]['timestamp_local']
                    target_time = current_time + timedelta(hours=horizon_hours)
                    
                    # Find actual location at target time
                    future_data = user_data[user_data['timestamp_local'] >= target_time]
                    if len(future_data) == 0:
                        continue
                    
                    actual_location = future_data.iloc[0]
                    actual_grid = f"g_{int(actual_location['grid_y'])}_{int(actual_location['grid_x'])}"
                    
                    # Make prediction
                    try:
                        result = self.agent.invoke({
                            "task": "next_position",
                            "user_id": user_id,
                            "t": target_time.strftime('%Y-%m-%d %H:%M:%S')
                        })
                        
                        predicted_location = result.get('predicted_location', 'unknown')
                        
                        predictions.append(predicted_location)
                        ground_truth.append(actual_grid)
                        
                    except Exception as e:
                        print(f"Prediction failed for {user_id} at {target_time}: {e}")
                        continue
            
            # Calculate accuracy
            if predictions and ground_truth:
                accuracy = accuracy_score(ground_truth, predictions)
                
                results[f'{horizon_hours}h'] = {
                    'accuracy': accuracy,
                    'total_predictions': len(predictions),
                    'unique_locations': len(set(ground_truth)),
                    'predictions': predictions,
                    'ground_truth': ground_truth
                }
                
                print(f"  Accuracy: {accuracy:.3f} ({len(predictions)} predictions)")
            else:
                results[f'{horizon_hours}h'] = {
                    'accuracy': 0.0,
                    'total_predictions': 0,
                    'error': 'No valid predictions'
                }
        
        return results
    
    def evaluate_purpose_prediction(self):
        """Evaluate activity purpose prediction accuracy"""
        
        tb = TrajectoryBatch(self.test_data)
        
        # Extract activities
        acts = ActivityExtractor(tb, radius_m=300, min_dwell_minutes=30).extract()
        
        # Manually labeled activities (you would need to provide these)
        # labeled_purposes = load_ground_truth_purposes()
        
        # For demo, create synthetic ground truth
        synthetic_purposes = []
        for act in acts:
            hour = act.start.hour
            duration = act.duration_minutes
            
            # Simple heuristic for demo
            if 7 <= hour <= 9 or 17 <= hour <= 19:
                synthetic_purposes.append('commute')
            elif 9 <= hour <= 17 and duration > 240:
                synthetic_purposes.append('work')
            elif 12 <= hour <= 14:
                synthetic_purposes.append('dining')
            elif 18 <= hour <= 23:
                synthetic_purposes.append('social')
            else:
                synthetic_purposes.append('other')
        
        # Predict purposes with MRRA
        llm = make_llm(**{'provider': 'openai', 'model': 'gpt-4o-mini', 'api_key': 'your-key'})
        predicted_acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)
        predicted_purposes = [act.purpose for act in predicted_acts]
        
        # Evaluate
        if len(predicted_purposes) == len(synthetic_purposes):
            accuracy = accuracy_score(synthetic_purposes, predicted_purposes)
            report = classification_report(synthetic_purposes, predicted_purposes)
            
            return {
                'accuracy': accuracy,
                'classification_report': report,
                'ground_truth': synthetic_purposes,
                'predictions': predicted_purposes
            }
        else:
            return {'error': 'Mismatch in prediction counts'}
    
    def benchmark_performance(self, num_iterations=10):
        """Benchmark prediction performance (speed and consistency)"""
        
        tb = TrajectoryBatch(self.test_data)
        user_id = tb.users()[0]
        
        times = []
        results = []
        
        for i in range(num_iterations):
            start_time = datetime.now()
            
            try:
                result = self.agent.invoke({
                    "task": "next_position",
                    "user_id": user_id,
                    "t": "2024-01-01 12:00:00"
                })
                
                end_time = datetime.now()
                elapsed = (end_time - start_time).total_seconds()
                
                times.append(elapsed)
                results.append(result.get('predicted_location'))
                
            except Exception as e:
                print(f"Benchmark iteration {i} failed: {e}")
        
        return {
            'avg_time': np.mean(times),
            'std_time': np.std(times),
            'min_time': np.min(times),
            'max_time': np.max(times),
            'consistency': len(set(results)) / len(results),  # Lower is more consistent
            'success_rate': len(results) / num_iterations
        }

# Usage example
def run_evaluation_example(agent, test_df):
    """Run comprehensive evaluation"""
    
    evaluator = MobilityPredictorEvaluator(agent, test_df)
    
    print("🔍 Evaluating next position accuracy...")
    accuracy_results = evaluator.evaluate_next_position_accuracy(time_horizons=[1, 4, 8])
    
    print("\n📊 Accuracy Results:")
    for horizon, result in accuracy_results.items():
        if 'accuracy' in result:
            print(f"  {horizon}: {result['accuracy']:.3f} ({result['total_predictions']} predictions)")
    
    print("\n🎯 Evaluating purpose prediction...")
    purpose_results = evaluator.evaluate_purpose_prediction()
    
    if 'accuracy' in purpose_results:
        print(f"Purpose accuracy: {purpose_results['accuracy']:.3f}")
    
    print("\n⚡ Benchmarking performance...")
    benchmark_results = evaluator.benchmark_performance()
    
    print(f"Average prediction time: {benchmark_results['avg_time']:.2f}s")
    print(f"Success rate: {benchmark_results['success_rate']:.3f}")
    print(f"Consistency score: {benchmark_results['consistency']:.3f}")
    
    return {
        'accuracy': accuracy_results,
        'purpose': purpose_results,
        'benchmark': benchmark_results
    }

# Run evaluation
# evaluation_results = run_evaluation_example(agent, test_df)

Industry-Specific Examples

Smart City Traffic Management

class TrafficManagementSystem:
    """Smart city traffic management using MRRA predictions"""
    
    def __init__(self, mrra_agent):
        self.agent = mrra_agent
        self.congestion_threshold = 0.7
        self.prediction_horizon_minutes = 30
        
    def predict_traffic_hotspots(self, user_data, current_time):
        """Predict traffic congestion hotspots"""
        
        # Get predictions for all users
        future_time = current_time + timedelta(minutes=self.prediction_horizon_minutes)
        
        predicted_locations = []
        
        for user_id in user_data.keys():
            try:
                result = self.agent.invoke({
                    "task": "next_position",
                    "user_id": user_id,
                    "t": future_time.strftime('%Y-%m-%d %H:%M:%S')
                })
                
                location = result.get('predicted_location')
                if location:
                    predicted_locations.append(location)
                    
            except Exception as e:
                print(f"Failed prediction for {user_id}: {e}")
        
        # Count predictions per location
        from collections import Counter
        location_counts = Counter(predicted_locations)
        
        # Identify hotspots
        total_users = len(user_data)
        hotspots = []
        
        for location, count in location_counts.items():
            congestion_level = count / total_users
            
            if congestion_level > self.congestion_threshold:
                hotspots.append({
                    'location': location,
                    'predicted_users': count,
                    'congestion_level': congestion_level,
                    'severity': 'high' if congestion_level > 0.8 else 'medium'
                })
        
        return sorted(hotspots, key=lambda x: x['congestion_level'], reverse=True)
    
    def generate_traffic_recommendations(self, hotspots):
        """Generate traffic management recommendations"""
        
        recommendations = []
        
        for hotspot in hotspots:
            location = hotspot['location']
            severity = hotspot['severity']
            
            if severity == 'high':
                recommendations.append({
                    'location': location,
                    'action': 'increase_signal_timing',
                    'description': f'Increase green light duration at {location}',
                    'priority': 'high'
                })
                
                recommendations.append({
                    'location': location,
                    'action': 'deploy_traffic_control',
                    'description': f'Deploy traffic officers to {location}',
                    'priority': 'high'
                })
            
            elif severity == 'medium':
                recommendations.append({
                    'location': location,
                    'action': 'alternative_routes',
                    'description': f'Suggest alternative routes around {location}',
                    'priority': 'medium'
                })
        
        return recommendations

# Usage
# traffic_system = TrafficManagementSystem(agent)
# hotspots = traffic_system.predict_traffic_hotspots(user_data, datetime.now())
# recommendations = traffic_system.generate_traffic_recommendations(hotspots)

Retail Location Analytics

class RetailLocationAnalyzer:
    """Retail location analytics using mobility predictions"""
    
    def __init__(self, mrra_agent):
        self.agent = mrra_agent
        
    def analyze_store_catchment(self, store_location, user_data, time_range_hours=24):
        """Analyze catchment area for a retail store"""
        
        catchment_users = []
        
        for user_id in user_data.keys():
            # Predict user movements over time range
            user_predictions = []
            
            base_time = datetime.now()
            
            for hour_offset in range(0, time_range_hours, 2):  # Every 2 hours
                prediction_time = base_time + timedelta(hours=hour_offset)
                
                try:
                    result = self.agent.invoke({
                        "task": "next_position",
                        "user_id": user_id,
                        "t": prediction_time.strftime('%Y-%m-%d %H:%M:%S')
                    })
                    
                    predicted_location = result.get('predicted_location')
                    if predicted_location == store_location:
                        user_predictions.append({
                            'time': prediction_time,
                            'confidence': result.get('confidence', 0.5)
                        })
                        
                except Exception as e:
                    continue
            
            if user_predictions:
                catchment_users.append({
                    'user_id': user_id,
                    'visit_predictions': user_predictions,
                    'total_visits': len(user_predictions),
                    'avg_confidence': np.mean([p['confidence'] for p in user_predictions])
                })
        
        return {
            'store_location': store_location,
            'catchment_size': len(catchment_users),
            'total_predicted_visits': sum(user['total_visits'] for user in catchment_users),
            'detailed_users': catchment_users
        }
    
    def recommend_store_locations(self, candidate_locations, user_data):
        """Recommend optimal store locations"""
        
        location_scores = []
        
        for location in candidate_locations:
            catchment = self.analyze_store_catchment(location, user_data)
            
            score = {
                'location': location,
                'catchment_size': catchment['catchment_size'],
                'predicted_visits': catchment['total_predicted_visits'],
                'score': catchment['catchment_size'] * 0.6 + catchment['total_predicted_visits'] * 0.4
            }
            
            location_scores.append(score)
        
        return sorted(location_scores, key=lambda x: x['score'], reverse=True)

# Usage
# retail_analyzer = RetailLocationAnalyzer(agent)
# catchment = retail_analyzer.analyze_store_catchment('g_1234_5678', user_data)
# recommendations = retail_analyzer.recommend_store_locations(['g_1234_5678', 'g_2345_6789'], user_data)

Example Data Sources:

  • Geolife Dataset: Research dataset with GPS trajectories
  • Foursquare Check-ins: Location-based social network data
  • CDR Data: Call detail records from telecom providers
  • Transit Data: Public transportation GPS traces
  • Synthetic Data: Generated mobility data for testing

Testing and Development Examples

Unit Testing Framework

import unittest
from unittest.mock import Mock, patch
import tempfile
import os

class TestMRRAComponents(unittest.TestCase):
    """Unit tests for MRRA components"""
    
    def setUp(self):
        """Set up test fixtures"""
        
        # Create sample trajectory data
        self.sample_data = pd.DataFrame({
            'user_id': ['test_user'] * 4,
            'timestamp': ['2024-01-01 08:00:00', '2024-01-01 12:00:00', 
                         '2024-01-01 18:00:00', '2024-01-02 08:00:00'],
            'latitude': [31.2304, 31.2404, 31.2304, 31.2404],
            'longitude': [121.4737, 121.4837, 121.4737, 121.4837]
        })
        
        self.tb = TrajectoryBatch(self.sample_data)
        
        # Mock LLM for testing
        self.mock_llm = Mock()
        self.mock_llm.invoke.return_value = "work"  # Mock purpose assignment
        
        # Temporary directory for cache tests
        self.temp_dir = tempfile.mkdtemp()
    
    def tearDown(self):
        """Clean up test fixtures"""
        import shutil
        shutil.rmtree(self.temp_dir, ignore_errors=True)
    
    def test_trajectory_batch_creation(self):
        """Test trajectory batch creation and validation"""
        
        self.assertEqual(len(self.tb.df), 4)
        self.assertEqual(len(self.tb.users()), 1)
        self.assertIn('timestamp_local', self.tb.df.columns)
        self.assertIn('hour', self.tb.df.columns)
        self.assertIn('dow', self.tb.df.columns)
    
    def test_activity_extraction(self):
        """Test activity extraction"""
        
        extractor = ActivityExtractor(self.tb, radius_m=300, min_dwell_minutes=20)
        activities = extractor.extract()
        
        self.assertGreater(len(activities), 0)
        
        for activity in activities:
            self.assertHasAttr(activity, 'start')
            self.assertHasAttr(activity, 'end')
            self.assertHasAttr(activity, 'center_lat')
            self.assertHasAttr(activity, 'center_lon')
    
    def test_purpose_assignment_with_mock(self):
        """Test purpose assignment with mocked LLM"""
        
        activities = ActivityExtractor(self.tb, radius_m=300, min_dwell_minutes=20).extract()
        
        assigner = ActivityPurposeAssigner(self.tb, llm=self.mock_llm)
        assigned_activities = assigner.assign(activities)
        
        self.assertEqual(len(assigned_activities), len(activities))
        
        for activity in assigned_activities:
            self.assertHasAttr(activity, 'purpose')
            self.assertIsNotNone(activity.purpose)
    
    def test_graph_construction(self):
        """Test mobility graph construction"""
        
        activities = ActivityExtractor(self.tb).extract()
        
        # Mock purpose assignment
        for activity in activities:
            activity.purpose = 'test_purpose'
        
        cfg = GraphConfig(grid_size_m=200, use_activities=True)
        mg = MobilityGraph(self.tb, cfg, activities=activities, assume_purposes_assigned=True)
        
        self.assertGreater(mg.G.number_of_nodes(), 0)
        self.assertGreater(mg.G.number_of_edges(), 0)
        
        # Check for expected node types
        node_types = set()
        for node in mg.G.nodes():
            node_type = node.split('_')[0]
            node_types.add(node_type)
        
        self.assertIn('u', node_types)  # User nodes
        self.assertIn('g', node_types)  # Grid/location nodes
    
    def test_cache_operations(self):
        """Test cache save and load operations"""
        
        from mrra.persist.cache import CacheManager, compute_tb_hash
        
        cm = CacheManager(base_dir=self.temp_dir)
        tb_hash = compute_tb_hash(self.tb)
        
        activities = ActivityExtractor(self.tb).extract()
        
        # Test activity caching
        cm.save_activities(tb_hash, "test", activities)
        loaded_activities = cm.load_activities(tb_hash, "test")
        
        self.assertEqual(len(loaded_activities), len(activities))
        
        # Test JSON caching
        test_data = {"key": "value", "count": 42}
        cm.save_json(tb_hash, "test_json", test_data)
        loaded_data = cm.load_json(tb_hash, "test_json")
        
        self.assertEqual(loaded_data, test_data)
    
    @patch('mrra.agents.subagents.make_llm')
    def test_agent_creation(self, mock_make_llm):
        """Test agent creation with mocked dependencies"""
        
        # Mock LLM
        mock_llm_instance = Mock()
        mock_make_llm.return_value = mock_llm_instance
        
        # Create minimal graph
        activities = ActivityExtractor(self.tb).extract()
        for activity in activities:
            activity.purpose = 'test'
        
        cfg = GraphConfig(grid_size_m=200, use_activities=True)
        mg = MobilityGraph(self.tb, cfg, activities=activities, assume_purposes_assigned=True)
        
        # Create retriever
        retriever = GraphRAGGenerate(tb=self.tb, mobility_graph=mg)
        
        # Create agent
        reflection_cfg = dict(
            max_round=1,
            subAgents=[{"name": "test", "prompt": "test prompt"}],
            aggregator="confidence_weighted_voting"
        )
        
        llm_cfg = {"provider": "test", "model": "test"}
        agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
        
        self.assertIsNotNone(agent)

# Run tests
if __name__ == '__main__':
    unittest.main()

Integration Testing

class TestMRRAIntegration(unittest.TestCase):
    """Integration tests for complete MRRA workflow"""
    
    @unittest.skipIf(not os.getenv('MRRA_INTEGRATION_TEST'), "Integration tests require MRRA_INTEGRATION_TEST=1")
    def test_complete_workflow(self):
        """Test complete MRRA workflow end-to-end"""
        
        # This test requires real LLM API access
        llm_cfg = {
            'provider': 'openai',
            'model': 'gpt-4o-mini',
            'api_key': os.getenv('OPENAI_API_KEY'),
            'temperature': 0.2
        }
        
        if not llm_cfg['api_key']:
            self.skipTest("OpenAI API key required for integration test")
        
        # Use sample data
        df = pd.DataFrame({
            'user_id': ['integration_user'] * 6,
            'timestamp': [
                '2024-01-01 08:00:00', '2024-01-01 09:00:00', '2024-01-01 12:00:00',
                '2024-01-01 13:00:00', '2024-01-01 18:00:00', '2024-01-02 08:00:00'
            ],
            'latitude': [31.2304, 31.2354, 31.2404, 31.2454, 31.2304, 31.2354],
            'longitude': [121.4737, 121.4787, 121.4837, 121.4887, 121.4737, 121.4787]
        })
        
        # Complete workflow
        tb = TrajectoryBatch(df)
        
        # Extract activities
        acts = ActivityExtractor(tb, radius_m=200, min_dwell_minutes=30).extract()
        self.assertGreater(len(acts), 0, "Should extract at least one activity")
        
        # Assign purposes
        llm = make_llm(**llm_cfg)
        acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=2).assign(acts)
        
        # Verify purposes were assigned
        purposes = [getattr(act, 'purpose', None) for act in acts]
        self.assertTrue(any(purpose != 'Other' for purpose in purposes), 
                       "Should assign specific purposes, not just 'Other'")
        
        # Build graph
        cfg = GraphConfig(grid_size_m=200, use_activities=True)
        mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
        
        self.assertGreater(mg.G.number_of_nodes(), 0, "Graph should have nodes")
        
        # Create agent and make prediction
        retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
        
        reflection_cfg = dict(
            max_round=1,
            subAgents=[
                {"name": "temporal", "prompt": "Select most likely location based on time patterns."}
            ],
            aggregator="confidence_weighted_voting"
        )
        
        agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
        
        # Make prediction
        result = agent.invoke({
            "task": "next_position",
            "user_id": "integration_user",
            "t": "2024-01-02 12:00:00"
        })
        
        self.assertIn('predicted_location', result, "Should return predicted location")
        self.assertIsNotNone(result['predicted_location'], "Predicted location should not be None")

# Run integration tests with: MRRA_INTEGRATION_TEST=1 python -m unittest test_examples.TestMRRAIntegration

Running Examples:

  • Replace 'your-api-key' with actual API keys
  • Ensure you have proper data access permissions
  • Start with small datasets for testing
  • Monitor API usage and costs during development
  • Use caching extensively to avoid repeated LLM calls

Next Steps