Troubleshooting
Common issues, solutions, and best practices for MRRA
Troubleshooting
This guide covers common issues, their solutions, and best practices for using MRRA effectively.
Common Issues
Activity Purpose Issues
Problem: All activity purposes are assigned as "Other" or "其他"
Common Causes:
- Passing LLM configuration dictionary instead of LLM client object
- LLM API authentication issues
- Graph construction calling LLM again instead of using cached purposes
Solution:
# ❌ Incorrect: passing config dictionary
acts = ActivityPurposeAssigner(tb, llm=llm_cfg, concurrency=8).assign(acts)
# ✅ Correct: pass LLM client object
from mrra.agents.subagents import make_llm
llm = make_llm(**llm_cfg) # Create client object
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
# ✅ Ensure graph construction uses cached purposes
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)Verification:
# Check if purposes were assigned correctly
purpose_counts = {}
for act in acts:
purpose = getattr(act, 'purpose', 'No purpose')
purpose_counts[purpose] = purpose_counts.get(purpose, 0) + 1
print("Purpose distribution:", purpose_counts)
# Should see variety of purposes, not just "Other"Problem: LLM API connection failures or timeouts
Common Causes:
- Invalid API keys or base URLs
- Network connectivity issues
- Rate limiting from LLM provider
- Incorrect model names
Solution:
# Test LLM connection first
def test_llm_connection(llm_cfg):
try:
llm = make_llm(**llm_cfg)
test_response = llm.invoke("Test message")
print("✅ LLM connection successful")
return True
except Exception as e:
print(f"❌ LLM connection failed: {e}")
return False
# Test before using in purpose assignment
if test_llm_connection(llm_cfg):
llm = make_llm(**llm_cfg)
acts = ActivityPurposeAssigner(
tb,
llm=llm,
concurrency=4, # Reduce concurrency if rate limited
llm_timeout=120 # Increase timeout
).assign(acts)Rate Limit Handling:
# Handle rate limits with retry logic
import time
import random
def assign_purposes_with_retry(tb, llm, acts, max_retries=3):
for attempt in range(max_retries):
try:
return ActivityPurposeAssigner(
tb,
llm=llm,
concurrency=2, # Lower concurrency
llm_timeout=60
).assign(acts)
except Exception as e:
if "rate limit" in str(e).lower() and attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.random()
print(f"Rate limited, waiting {wait_time:.1f}s before retry {attempt + 1}")
time.sleep(wait_time)
else:
raise eProblem: Assigned purposes are vague or incorrect
Common Causes:
- Insufficient location context
- Poor quality trajectory data
- Generic prompts not tailored to domain
Solution:
# Enhance purpose assignment with better context
class EnhancedActivityPurposeAssigner(ActivityPurposeAssigner):
def __init__(self, *args, custom_prompt=None, **kwargs):
super().__init__(*args, **kwargs)
self.custom_prompt = custom_prompt
def create_purpose_prompt(self, activity):
"""Create enhanced prompt with more context"""
base_prompt = super().create_purpose_prompt(activity)
if self.custom_prompt:
enhanced_prompt = f"""
{self.custom_prompt}
{base_prompt}
Additional Context:
- Duration: {activity.duration_minutes} minutes
- Time: {activity.start} to {activity.end}
- Day of week: {activity.start.strftime('%A')}
- Hour: {activity.start.hour}
Common purposes include: work, home, dining, shopping, leisure,
healthcare, education, transportation, social, business.
"""
else:
enhanced_prompt = base_prompt
return enhanced_prompt
# Use enhanced assigner
custom_prompt = """
You are analyzing mobility data for urban professionals.
Consider typical daily routines and local context when assigning purposes.
"""
enhanced_assigner = EnhancedActivityPurposeAssigner(
tb,
llm=llm,
custom_prompt=custom_prompt,
concurrency=4
)
acts = enhanced_assigner.assign(acts)Graph and Retrieval Issues
Graph Saving Errors
Problem: networkx has no attribute write_gpickle error
Solution: MRRA uses standard pickle for graph serialization:
# ❌ Don't use networkx write_gpickle directly
# nx.write_gpickle(G, "graph.gpickle")
# ✅ Use CacheManager for graph operations
from mrra.persist.cache import CacheManager
cm = CacheManager()
cm.save_graph(tb_hash, "mobility_default", mg.G)
cached_graph = cm.load_graph(tb_hash, "mobility_default")Purpose-Based Retrieval Not Working
Problem: Purpose context not influencing retrieval results
Solution: Ensure purpose nodes exist in graph and configure retrieval weights:
# Check if purpose nodes exist in graph
purpose_nodes = [n for n in mg.G.nodes() if n.startswith('p_')]
print(f"Purpose nodes in graph: {purpose_nodes}")
# Configure retrieval weights for purposes
from mrra.retriever.graph_rag import GraphRAGGenerate
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
retriever.purpose_weight = 0.6 # Increase purpose influence
retriever.hour_weight = 0.4
retriever.dow_weight = 0.3
# Use purpose in query
docs = retriever.get_relevant_documents({
"user_id": user_id,
"purpose": "dining", # Or ["dining", "work"]
"k": 8
})
# Verify purpose influence
for doc in docs:
print(f"Location: {doc.metadata['node']}, Score: {doc.metadata['score']:.4f}")Performance and Memory Issues
Slow Activity Purpose Assignment
Problem: Purpose assignment takes too long or uses too much memory
Solution: Optimize concurrency and implement batching:
# Optimize concurrency based on your system
import os
import psutil
def get_optimal_concurrency():
"""Calculate optimal concurrency based on system resources"""
cpu_count = os.cpu_count()
memory_gb = psutil.virtual_memory().total / (1024**3)
# Conservative approach: limit based on memory and CPU
max_concurrent = min(
cpu_count * 2, # CPU-based limit
int(memory_gb), # Memory-based limit (1 GB per worker)
10 # Hard limit to prevent API overload
)
return max(1, max_concurrent)
# Use optimal concurrency
optimal_concurrency = get_optimal_concurrency()
print(f"Using concurrency: {optimal_concurrency}")
acts = ActivityPurposeAssigner(
tb,
llm=llm,
concurrency=optimal_concurrency,
llm_timeout=60 # Reasonable timeout
).assign(acts)Memory Issues with Large Datasets
Problem: Out of memory errors with large trajectory datasets
Solution: Implement batch processing and memory management:
def process_large_dataset_in_batches(tb, batch_size=1000):
"""Process large trajectory datasets in batches"""
total_points = len(tb.df)
results = []
for i in range(0, total_points, batch_size):
batch_df = tb.df.iloc[i:i+batch_size].copy()
batch_tb = TrajectoryBatch(batch_df)
print(f"Processing batch {i//batch_size + 1}: {len(batch_df)} points")
# Process batch
batch_acts = ActivityExtractor(batch_tb).extract()
batch_acts = ActivityPurposeAssigner(batch_tb, llm=llm, concurrency=2).assign(batch_acts)
results.extend(batch_acts)
# Force garbage collection
import gc
gc.collect()
return results
# Use for large datasets
if len(tb.df) > 10000:
print("Large dataset detected, using batch processing")
acts = process_large_dataset_in_batches(tb, batch_size=2000)
else:
acts = ActivityExtractor(tb).extract()
acts = ActivityPurposeAssigner(tb, llm=llm).assign(acts)Data Quality Issues
Insufficient Activity Detection
Problem: Too few activities detected from trajectory data
Solution: Adjust extraction parameters:
# Analyze data characteristics first
def analyze_trajectory_characteristics(tb):
"""Analyze trajectory data to suggest parameters"""
df = tb.df
# Calculate basic statistics
stats = {
'total_points': len(df),
'unique_users': df['user_id'].nunique(),
'time_span_days': (df['timestamp_local'].max() - df['timestamp_local'].min()).days,
'avg_points_per_user': len(df) / df['user_id'].nunique(),
}
# Calculate typical distances between consecutive points
distances = []
for user_id in df['user_id'].unique():
user_df = df[df['user_id'] == user_id].sort_values('timestamp_local')
for i in range(1, len(user_df)):
prev_row = user_df.iloc[i-1]
curr_row = user_df.iloc[i]
# Simple distance calculation (approximate)
lat_diff = curr_row['latitude'] - prev_row['latitude']
lon_diff = curr_row['longitude'] - prev_row['longitude']
distance = ((lat_diff**2 + lon_diff**2)**0.5) * 111000 # Rough meters
distances.append(distance)
if len(distances) > 1000: # Sample to avoid memory issues
break
if distances:
stats['median_distance_m'] = sorted(distances)[len(distances)//2]
stats['90th_percentile_distance_m'] = sorted(distances)[int(len(distances)*0.9)]
return stats
# Analyze and adjust parameters
stats = analyze_trajectory_characteristics(tb)
print("Trajectory characteristics:", stats)
# Suggest parameters based on data
suggested_radius = min(500, max(100, stats.get('90th_percentile_distance_m', 300)))
suggested_min_dwell = 20 if stats.get('avg_points_per_user', 0) > 1000 else 10
ext_cfg = dict(
method="radius",
radius_m=suggested_radius,
min_dwell_minutes=suggested_min_dwell,
max_gap_minutes=120 # Increase for sparse data
)
print(f"Suggested extraction config: {ext_cfg}")
acts = ActivityExtractor(tb, **ext_cfg).extract()
print(f"Extracted {len(acts)} activities")Geolife Dataset Issues
Data Loading Problems
Problem: Issues loading Geolife .plt files
Solution: Verify data structure and use proper loading:
def verify_geolife_structure(data_dir):
"""Verify Geolife data structure"""
import os
import glob
expected_structure = {}
user_dirs = glob.glob(os.path.join(data_dir, "*"))
for user_dir in user_dirs:
if not os.path.isdir(user_dir):
continue
user_name = os.path.basename(user_dir)
trajectory_dir = os.path.join(user_dir, "Trajectory")
if os.path.exists(trajectory_dir):
plt_files = glob.glob(os.path.join(trajectory_dir, "*.plt"))
expected_structure[user_name] = len(plt_files)
else:
expected_structure[user_name] = 0
print("Geolife data structure:")
for user, file_count in expected_structure.items():
print(f" {user}: {file_count} .plt files")
return expected_structure
# Verify before running scripts
verify_geolife_structure("scripts/Data")
# Run with specific user
import os
os.environ['GEOLIFE_USER'] = 'user_001' # Specify userScript Execution Issues
Problem: scripts/verify_geolife.py fails to run
Solution: Check environment and dependencies:
# Ensure you're in the right directory
cd /path/to/mrra/project
# Check if script exists
ls scripts/verify_geolife.py
# Run with Python path
python scripts/verify_geolife.py
# If import errors, check installation
pip install -e .
# Check required dependencies
pip list | grep -E "(pandas|numpy|networkx|langchain)"Best Practices
Development Workflow
def robust_mrra_pipeline(df, llm_cfg, debug=True):
"""Robust MRRA pipeline with error handling and validation"""
try:
# 1. Validate input data
if debug:
print("🔍 Validating input data...")
required_cols = ['user_id', 'timestamp', 'latitude', 'longitude']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
if len(df) == 0:
raise ValueError("Empty trajectory data")
# 2. Initialize with caching
tb = TrajectoryBatch(df)
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
if debug:
print(f"📊 Processing {len(df)} trajectory points for {tb.df['user_id'].nunique()} users")
print(f"🔑 Cache key: {tb_hash}")
# 3. Try cached activities first
activities_key = "robust_pipeline_activities"
acts = cm.load_activities(tb_hash, activities_key)
if acts:
if debug:
print(f"✅ Loaded {len(acts)} cached activities")
else:
if debug:
print("📊 Computing activities...")
# Test LLM connection
if not test_llm_connection(llm_cfg):
raise RuntimeError("LLM connection failed")
# Extract activities
ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
acts = ActivityExtractor(tb, **ext_cfg).extract()
if len(acts) == 0:
print("⚠️ No activities detected, adjusting parameters...")
ext_cfg['min_dwell_minutes'] = 15
ext_cfg['radius_m'] = 500
acts = ActivityExtractor(tb, **ext_cfg).extract()
# Assign purposes
llm = make_llm(**llm_cfg)
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=4).assign(acts)
# Cache results
cm.save_activities(tb_hash, activities_key, acts)
if debug:
print(f"💾 Cached {len(acts)} activities")
# 4. Build graph
graph_key = "robust_pipeline_graph"
cached_graph = cm.load_graph(tb_hash, graph_key)
if cached_graph:
if debug:
print("✅ Loaded cached mobility graph")
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg)
mg.G = cached_graph
else:
if debug:
print("📊 Building mobility graph...")
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
cm.save_graph(tb_hash, graph_key, mg.G)
# 5. Create agent
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
reflection_cfg = dict(
max_round=1,
subAgents=[
{"name": "temporal", "prompt": "Select the most likely location id from Options."},
{"name": "spatial", "prompt": "Select the most likely location id from Options."},
],
aggregator="confidence_weighted_voting"
)
agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)
if debug:
print("✅ MRRA pipeline completed successfully")
return agent, {
'trajectory_batch': tb,
'activities': acts,
'mobility_graph': mg,
'cache_hash': tb_hash
}
except Exception as e:
print(f"❌ Pipeline failed: {e}")
import traceback
if debug:
traceback.print_exc()
raise e
# Usage with error handling
try:
agent, components = robust_mrra_pipeline(df, llm_cfg, debug=True)
# Test prediction
user_id = components['trajectory_batch'].users()[0]
result = agent.invoke({
"task": "next_position",
"user_id": user_id
})
print("✅ Prediction successful:", result)
except Exception as e:
print(f"❌ Error in MRRA pipeline: {e}")Common Gotchas:
- Always use
make_llm(**llm_cfg)to create LLM client objects - Set
assume_purposes_assigned=Truewhen using cached activities in graph construction - Cache activities with purposes to avoid expensive LLM re-computation
- Monitor memory usage with large datasets
- Test LLM connectivity before running expensive operations
Debugging Tools
Debug Mode Configuration
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# MRRA-specific debug flags
debug_config = {
'verbose_activity_extraction': True,
'log_llm_calls': True,
'save_intermediate_results': True,
'validate_graph_construction': True
}
# Use in pipeline
def debug_mrra_pipeline(df, llm_cfg, debug_config):
"""Pipeline with comprehensive debugging"""
if debug_config.get('verbose_activity_extraction'):
print("🔍 Activity extraction details:")
tb = TrajectoryBatch(df)
for user_id in tb.users()[:1]: # Debug first user
user_data = tb.for_user(user_id)
print(f" User {user_id}: {len(user_data)} points")
print(f" Time range: {user_data['timestamp_local'].min()} to {user_data['timestamp_local'].max()}")
# Continue with regular pipeline...Performance Profiling
import time
import psutil
from functools import wraps
def profile_performance(func):
"""Decorator to profile function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
process = psutil.Process()
start_time = time.time()
start_memory = process.memory_info().rss / 1024 / 1024 # MB
result = func(*args, **kwargs)
end_time = time.time()
end_memory = process.memory_info().rss / 1024 / 1024 # MB
print(f"⏱️ {func.__name__}: {end_time - start_time:.2f}s, Memory: {start_memory:.1f}MB → {end_memory:.1f}MB")
return result
return wrapper
# Use profiling
@profile_performance
def profile_activity_extraction(tb):
return ActivityExtractor(tb).extract()
@profile_performance
def profile_purpose_assignment(tb, llm, acts):
return ActivityPurposeAssigner(tb, llm=llm).assign(acts)
# Profile your pipeline
acts = profile_activity_extraction(tb)
acts = profile_purpose_assignment(tb, llm, acts)Getting Help:
- Check existing GitHub issues for similar problems
- Create minimal reproducible examples when reporting bugs
- Include error messages, stack traces, and environment information
- Test with smaller datasets first to isolate issues
Next Steps
- Review Configuration for advanced setup options
- Explore Development for contributing fixes and improvements
- Check Examples for working code samples