MRRA LogoMRRA
Resources

Troubleshooting

Common issues, solutions, and best practices for MRRA

Troubleshooting

This guide covers common issues, their solutions, and best practices for using MRRA effectively.

Common Issues

Activity Purpose Issues

Problem: All activity purposes are assigned as "Other" or "其他"

Common Causes:

  • Passing LLM configuration dictionary instead of LLM client object
  • LLM API authentication issues
  • Graph construction calling LLM again instead of using cached purposes

Solution:

# ❌ Incorrect: passing config dictionary
acts = ActivityPurposeAssigner(tb, llm=llm_cfg, concurrency=8).assign(acts)

# ✅ Correct: pass LLM client object
from mrra.agents.subagents import make_llm
llm = make_llm(**llm_cfg)  # Create client object
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)

# ✅ Ensure graph construction uses cached purposes
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)

Verification:

# Check if purposes were assigned correctly
purpose_counts = {}
for act in acts:
    purpose = getattr(act, 'purpose', 'No purpose')
    purpose_counts[purpose] = purpose_counts.get(purpose, 0) + 1

print("Purpose distribution:", purpose_counts)

# Should see variety of purposes, not just "Other"

Problem: LLM API connection failures or timeouts

Common Causes:

  • Invalid API keys or base URLs
  • Network connectivity issues
  • Rate limiting from LLM provider
  • Incorrect model names

Solution:

# Test LLM connection first
def test_llm_connection(llm_cfg):
    try:
        llm = make_llm(**llm_cfg)
        test_response = llm.invoke("Test message")
        print("✅ LLM connection successful")
        return True
    except Exception as e:
        print(f"❌ LLM connection failed: {e}")
        return False

# Test before using in purpose assignment
if test_llm_connection(llm_cfg):
    llm = make_llm(**llm_cfg)
    acts = ActivityPurposeAssigner(
        tb, 
        llm=llm, 
        concurrency=4,  # Reduce concurrency if rate limited
        llm_timeout=120  # Increase timeout
    ).assign(acts)

Rate Limit Handling:

# Handle rate limits with retry logic
import time
import random

def assign_purposes_with_retry(tb, llm, acts, max_retries=3):
    for attempt in range(max_retries):
        try:
            return ActivityPurposeAssigner(
                tb, 
                llm=llm, 
                concurrency=2,  # Lower concurrency
                llm_timeout=60
            ).assign(acts)
        except Exception as e:
            if "rate limit" in str(e).lower() and attempt < max_retries - 1:
                wait_time = (2 ** attempt) + random.random()
                print(f"Rate limited, waiting {wait_time:.1f}s before retry {attempt + 1}")
                time.sleep(wait_time)
            else:
                raise e

Problem: Assigned purposes are vague or incorrect

Common Causes:

  • Insufficient location context
  • Poor quality trajectory data
  • Generic prompts not tailored to domain

Solution:

# Enhance purpose assignment with better context
class EnhancedActivityPurposeAssigner(ActivityPurposeAssigner):
    def __init__(self, *args, custom_prompt=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.custom_prompt = custom_prompt
    
    def create_purpose_prompt(self, activity):
        """Create enhanced prompt with more context"""
        
        base_prompt = super().create_purpose_prompt(activity)
        
        if self.custom_prompt:
            enhanced_prompt = f"""
            {self.custom_prompt}
            
            {base_prompt}
            
            Additional Context:
            - Duration: {activity.duration_minutes} minutes
            - Time: {activity.start} to {activity.end}
            - Day of week: {activity.start.strftime('%A')}
            - Hour: {activity.start.hour}
            
            Common purposes include: work, home, dining, shopping, leisure, 
            healthcare, education, transportation, social, business.
            """
        else:
            enhanced_prompt = base_prompt
        
        return enhanced_prompt

# Use enhanced assigner
custom_prompt = """
You are analyzing mobility data for urban professionals. 
Consider typical daily routines and local context when assigning purposes.
"""

enhanced_assigner = EnhancedActivityPurposeAssigner(
    tb, 
    llm=llm, 
    custom_prompt=custom_prompt,
    concurrency=4
)
acts = enhanced_assigner.assign(acts)

Graph and Retrieval Issues

Graph Saving Errors

Problem: networkx has no attribute write_gpickle error

Solution: MRRA uses standard pickle for graph serialization:

# ❌ Don't use networkx write_gpickle directly
# nx.write_gpickle(G, "graph.gpickle")

# ✅ Use CacheManager for graph operations
from mrra.persist.cache import CacheManager

cm = CacheManager()
cm.save_graph(tb_hash, "mobility_default", mg.G)
cached_graph = cm.load_graph(tb_hash, "mobility_default")

Purpose-Based Retrieval Not Working

Problem: Purpose context not influencing retrieval results

Solution: Ensure purpose nodes exist in graph and configure retrieval weights:

# Check if purpose nodes exist in graph
purpose_nodes = [n for n in mg.G.nodes() if n.startswith('p_')]
print(f"Purpose nodes in graph: {purpose_nodes}")

# Configure retrieval weights for purposes
from mrra.retriever.graph_rag import GraphRAGGenerate

retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
retriever.purpose_weight = 0.6   # Increase purpose influence
retriever.hour_weight = 0.4
retriever.dow_weight = 0.3

# Use purpose in query
docs = retriever.get_relevant_documents({
    "user_id": user_id,
    "purpose": "dining",  # Or ["dining", "work"]
    "k": 8
})

# Verify purpose influence
for doc in docs:
    print(f"Location: {doc.metadata['node']}, Score: {doc.metadata['score']:.4f}")

Performance and Memory Issues

Slow Activity Purpose Assignment

Problem: Purpose assignment takes too long or uses too much memory

Solution: Optimize concurrency and implement batching:

# Optimize concurrency based on your system
import os
import psutil

def get_optimal_concurrency():
    """Calculate optimal concurrency based on system resources"""
    cpu_count = os.cpu_count()
    memory_gb = psutil.virtual_memory().total / (1024**3)
    
    # Conservative approach: limit based on memory and CPU
    max_concurrent = min(
        cpu_count * 2,  # CPU-based limit
        int(memory_gb),  # Memory-based limit (1 GB per worker)
        10  # Hard limit to prevent API overload
    )
    
    return max(1, max_concurrent)

# Use optimal concurrency
optimal_concurrency = get_optimal_concurrency()
print(f"Using concurrency: {optimal_concurrency}")

acts = ActivityPurposeAssigner(
    tb, 
    llm=llm, 
    concurrency=optimal_concurrency,
    llm_timeout=60  # Reasonable timeout
).assign(acts)

Memory Issues with Large Datasets

Problem: Out of memory errors with large trajectory datasets

Solution: Implement batch processing and memory management:

def process_large_dataset_in_batches(tb, batch_size=1000):
    """Process large trajectory datasets in batches"""
    
    total_points = len(tb.df)
    results = []
    
    for i in range(0, total_points, batch_size):
        batch_df = tb.df.iloc[i:i+batch_size].copy()
        batch_tb = TrajectoryBatch(batch_df)
        
        print(f"Processing batch {i//batch_size + 1}: {len(batch_df)} points")
        
        # Process batch
        batch_acts = ActivityExtractor(batch_tb).extract()
        batch_acts = ActivityPurposeAssigner(batch_tb, llm=llm, concurrency=2).assign(batch_acts)
        
        results.extend(batch_acts)
        
        # Force garbage collection
        import gc
        gc.collect()
    
    return results

# Use for large datasets
if len(tb.df) > 10000:
    print("Large dataset detected, using batch processing")
    acts = process_large_dataset_in_batches(tb, batch_size=2000)
else:
    acts = ActivityExtractor(tb).extract()
    acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)

Data Quality Issues

Insufficient Activity Detection

Problem: Too few activities detected from trajectory data

Solution: Adjust extraction parameters:

# Analyze data characteristics first
def analyze_trajectory_characteristics(tb):
    """Analyze trajectory data to suggest parameters"""
    
    df = tb.df
    
    # Calculate basic statistics
    stats = {
        'total_points': len(df),
        'unique_users': df['user_id'].nunique(),
        'time_span_days': (df['timestamp_local'].max() - df['timestamp_local'].min()).days,
        'avg_points_per_user': len(df) / df['user_id'].nunique(),
    }
    
    # Calculate typical distances between consecutive points
    distances = []
    for user_id in df['user_id'].unique():
        user_df = df[df['user_id'] == user_id].sort_values('timestamp_local')
        
        for i in range(1, len(user_df)):
            prev_row = user_df.iloc[i-1]
            curr_row = user_df.iloc[i]
            
            # Simple distance calculation (approximate)
            lat_diff = curr_row['latitude'] - prev_row['latitude']
            lon_diff = curr_row['longitude'] - prev_row['longitude']
            distance = ((lat_diff**2 + lon_diff**2)**0.5) * 111000  # Rough meters
            
            distances.append(distance)
            
            if len(distances) > 1000:  # Sample to avoid memory issues
                break
    
    if distances:
        stats['median_distance_m'] = sorted(distances)[len(distances)//2]
        stats['90th_percentile_distance_m'] = sorted(distances)[int(len(distances)*0.9)]
    
    return stats

# Analyze and adjust parameters
stats = analyze_trajectory_characteristics(tb)
print("Trajectory characteristics:", stats)

# Suggest parameters based on data
suggested_radius = min(500, max(100, stats.get('90th_percentile_distance_m', 300)))
suggested_min_dwell = 20 if stats.get('avg_points_per_user', 0) > 1000 else 10

ext_cfg = dict(
    method="radius",
    radius_m=suggested_radius,
    min_dwell_minutes=suggested_min_dwell,
    max_gap_minutes=120  # Increase for sparse data
)

print(f"Suggested extraction config: {ext_cfg}")
acts = ActivityExtractor(tb, **ext_cfg).extract()
print(f"Extracted {len(acts)} activities")

Geolife Dataset Issues

Data Loading Problems

Problem: Issues loading Geolife .plt files

Solution: Verify data structure and use proper loading:

def verify_geolife_structure(data_dir):
    """Verify Geolife data structure"""
    import os
    import glob
    
    expected_structure = {}
    
    user_dirs = glob.glob(os.path.join(data_dir, "*"))
    for user_dir in user_dirs:
        if not os.path.isdir(user_dir):
            continue
            
        user_name = os.path.basename(user_dir)
        trajectory_dir = os.path.join(user_dir, "Trajectory")
        
        if os.path.exists(trajectory_dir):
            plt_files = glob.glob(os.path.join(trajectory_dir, "*.plt"))
            expected_structure[user_name] = len(plt_files)
        else:
            expected_structure[user_name] = 0
    
    print("Geolife data structure:")
    for user, file_count in expected_structure.items():
        print(f"  {user}: {file_count} .plt files")
    
    return expected_structure

# Verify before running scripts
verify_geolife_structure("scripts/Data")

# Run with specific user
import os
os.environ['GEOLIFE_USER'] = 'user_001'  # Specify user

Script Execution Issues

Problem: scripts/verify_geolife.py fails to run

Solution: Check environment and dependencies:

# Ensure you're in the right directory
cd /path/to/mrra/project

# Check if script exists
ls scripts/verify_geolife.py

# Run with Python path
python scripts/verify_geolife.py

# If import errors, check installation
pip install -e .

# Check required dependencies
pip list | grep -E "(pandas|numpy|networkx|langchain)"

Best Practices

Development Workflow

def robust_mrra_pipeline(df, llm_cfg, debug=True):
    """Robust MRRA pipeline with error handling and validation"""
    
    try:
        # 1. Validate input data
        if debug:
            print("🔍 Validating input data...")
        
        required_cols = ['user_id', 'timestamp', 'latitude', 'longitude']
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        
        if len(df) == 0:
            raise ValueError("Empty trajectory data")
        
        # 2. Initialize with caching
        tb = TrajectoryBatch(df)
        cm = CacheManager()
        tb_hash = compute_tb_hash(tb)
        
        if debug:
            print(f"📊 Processing {len(df)} trajectory points for {tb.df['user_id'].nunique()} users")
            print(f"🔑 Cache key: {tb_hash}")
        
        # 3. Try cached activities first
        activities_key = "robust_pipeline_activities"
        acts = cm.load_activities(tb_hash, activities_key)
        
        if acts:
            if debug:
                print(f"✅ Loaded {len(acts)} cached activities")
        else:
            if debug:
                print("📊 Computing activities...")
            
            # Test LLM connection
            if not test_llm_connection(llm_cfg):
                raise RuntimeError("LLM connection failed")
            
            # Extract activities
            ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
            acts = ActivityExtractor(tb, **ext_cfg).extract()
            
            if len(acts) == 0:
                print("⚠️ No activities detected, adjusting parameters...")
                ext_cfg['min_dwell_minutes'] = 15
                ext_cfg['radius_m'] = 500
                acts = ActivityExtractor(tb, **ext_cfg).extract()
            
            # Assign purposes
            llm = make_llm(**llm_cfg)
            acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=4).assign(acts)
            
            # Cache results
            cm.save_activities(tb_hash, activities_key, acts)
            
            if debug:
                print(f"💾 Cached {len(acts)} activities")
        
        # 4. Build graph
        graph_key = "robust_pipeline_graph"
        cached_graph = cm.load_graph(tb_hash, graph_key)
        
        if cached_graph:
            if debug:
                print("✅ Loaded cached mobility graph")
            cfg = GraphConfig(grid_size_m=200, use_activities=True)
            mg = MobilityGraph(tb, cfg)
            mg.G = cached_graph
        else:
            if debug:
                print("📊 Building mobility graph...")
            cfg = GraphConfig(grid_size_m=200, use_activities=True)
            mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
            cm.save_graph(tb_hash, graph_key, mg.G)
        
        # 5. Create agent
        retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
        
        reflection_cfg = dict(
            max_round=1,
            subAgents=[
                {"name": "temporal", "prompt": "Select the most likely location id from Options."},
                {"name": "spatial", "prompt": "Select the most likely location id from Options."},
            ],
            aggregator="confidence_weighted_voting"
        )
        
        agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
        
        if debug:
            print("✅ MRRA pipeline completed successfully")
        
        return agent, {
            'trajectory_batch': tb,
            'activities': acts,
            'mobility_graph': mg,
            'cache_hash': tb_hash
        }
        
    except Exception as e:
        print(f"❌ Pipeline failed: {e}")
        import traceback
        if debug:
            traceback.print_exc()
        raise e

# Usage with error handling
try:
    agent, components = robust_mrra_pipeline(df, llm_cfg, debug=True)
    
    # Test prediction
    user_id = components['trajectory_batch'].users()[0]
    result = agent.invoke({
        "task": "next_position",
        "user_id": user_id
    })
    print("✅ Prediction successful:", result)
    
except Exception as e:
    print(f"❌ Error in MRRA pipeline: {e}")

Common Gotchas:

  • Always use make_llm(**llm_cfg) to create LLM client objects
  • Set assume_purposes_assigned=True when using cached activities in graph construction
  • Cache activities with purposes to avoid expensive LLM re-computation
  • Monitor memory usage with large datasets
  • Test LLM connectivity before running expensive operations

Debugging Tools

Debug Mode Configuration

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

# MRRA-specific debug flags
debug_config = {
    'verbose_activity_extraction': True,
    'log_llm_calls': True,
    'save_intermediate_results': True,
    'validate_graph_construction': True
}

# Use in pipeline
def debug_mrra_pipeline(df, llm_cfg, debug_config):
    """Pipeline with comprehensive debugging"""
    
    if debug_config.get('verbose_activity_extraction'):
        print("🔍 Activity extraction details:")
        tb = TrajectoryBatch(df)
        
        for user_id in tb.users()[:1]:  # Debug first user
            user_data = tb.for_user(user_id)
            print(f"  User {user_id}: {len(user_data)} points")
            print(f"  Time range: {user_data['timestamp_local'].min()} to {user_data['timestamp_local'].max()}")
    
    # Continue with regular pipeline...

Performance Profiling

import time
import psutil
from functools import wraps

def profile_performance(func):
    """Decorator to profile function performance"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        process = psutil.Process()
        
        start_time = time.time()
        start_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        result = func(*args, **kwargs)
        
        end_time = time.time()
        end_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        print(f"⏱️ {func.__name__}: {end_time - start_time:.2f}s, Memory: {start_memory:.1f}MB → {end_memory:.1f}MB")
        
        return result
    return wrapper

# Use profiling
@profile_performance
def profile_activity_extraction(tb):
    return ActivityExtractor(tb).extract()

@profile_performance  
def profile_purpose_assignment(tb, llm, acts):
    return ActivityPurposeAssigner(tb, llm=llm).assign(acts)

# Profile your pipeline
acts = profile_activity_extraction(tb)
acts = profile_purpose_assignment(tb, llm, acts)

Getting Help:

  • Check existing GitHub issues for similar problems
  • Create minimal reproducible examples when reporting bugs
  • Include error messages, stack traces, and environment information
  • Test with smaller datasets first to isolate issues

Next Steps