MRRA LogoMRRA
Core Concepts

Mobility Graph

Concepts, structure and usage of the heterogeneous mobility graph

Mobility Graph

The Mobility Graph is a unified representation of "person-location-time-purpose" relationships using a multi-type heterogeneous graph (networkx.MultiDiGraph). It enables retrieval, reasoning, and interpretable analysis of mobility patterns.

Graph Structure

Node Types

The mobility graph contains several types of nodes, each representing different aspects of mobility:

Format: u_<user_id>

Represents individual users in the system.

# Example user nodes
"u_user_123"
"u_user_456" 

Format: g_<gy>_<gx> (grid-based)

Attributes:

  • gy, gx: Grid coordinates
  • lat, lon: Approximate coordinates (when built with activities)
# Example location nodes
"g_1234_5678"  # Grid cell at y=1234, x=5678
"g_2345_6789"  # Another grid cell

Hour nodes: h_<0..23> Day-of-week nodes: d_<0..6> (0=Monday) Time-bin nodes: t_<hour>_<dow>

# Example temporal nodes
"h_9"          # 9 AM
"d_1"          # Tuesday  
"t_9_1"        # 9 AM on Tuesday

Format: p_<purpose>

Attributes:

  • name: Purpose description
# Example purpose nodes
"p_work"       # Work activities
"p_dining"     # Dining activities
"p_home"       # Home/residential activities

Edge Types and Weights

The graph contains multiple types of edges representing different relationships:

User-Location Edges

  • Direction: user → location
  • Weight: Visit intensity (count for point-based construction; minutes for activity-based construction)
  • Attributes: activity_type, activity_purpose

Temporal Co-occurrence Edges

  • Direction: Bidirectional (loc ↔ hour/dow/timebin)
  • Weight: Co-occurrence strength (minutes/counts)
  • Purpose: Captures when locations are typically visited

Purpose-Context Edges

  • Direction: Bidirectional (purpose ↔ loc/hour/dow/timebin)
  • Weight: Co-occurrence strength (minutes)
  • Purpose: Links purposes to spatial and temporal contexts

User-Purpose Edges

  • Direction: user → purpose
  • Weight: Total time spent on this purpose (minutes)
  • Purpose: User activity profiling

Transition Edges

  • Location transitions: loc → loc (adjacent activity transitions)
  • Purpose transitions: purpose → purpose (adjacent activity purpose transitions)
  • Attributes: Include user information for personalized analysis

Graph Construction

Basic Construction

from mrra.graph.mobility_graph import MobilityGraph, GraphConfig

# Configure graph parameters
cfg = GraphConfig(
    grid_size_m=200,           # Grid cell size in meters
    min_dwell_minutes=5,       # Minimum dwell time for activities
    use_activities=True        # Use activity-based construction
)

# Build the graph
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
G = mg.G  # Access the networkx graph

Configuration Options

# Detailed configuration example
cfg = GraphConfig(
    grid_size_m=200,              # Grid resolution
    min_dwell_minutes=5,          # Activity detection threshold
    use_activities=True,          # Use activities vs raw points
    include_transitions=True,     # Include loc→loc edges
    purpose_transitions=True,     # Include purpose→purpose edges
    temporal_granularity='hour', # 'hour', 'timebin', or 'both'
    weight_by_duration=True      # Weight edges by time spent
)

Advanced Construction with Custom Activities

# Use pre-computed activities with purposes
from mrra.data.activity import ActivityExtractor
from mrra.analysis.activity_purpose import ActivityPurposeAssigner

# Extract and assign purposes to activities
ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
acts = ActivityExtractor(tb, **ext_cfg).extract()
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)

# Build graph with purpose-enriched activities  
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)

Graph Analysis

Basic Graph Statistics

# Get basic graph information
print(f"Nodes: {G.number_of_nodes()}")
print(f"Edges: {G.number_of_edges()}")

# Analyze node types
node_types = {}
for node in G.nodes():
    node_type = node.split('_')[0]
    node_types[node_type] = node_types.get(node_type, 0) + 1

print("Node type distribution:", node_types)

Centrality Analysis

import networkx as nx

# Calculate centrality measures
degree_centrality = nx.degree_centrality(G)
betweenness_centrality = nx.betweenness_centrality(G, k=1000)  # Sample for large graphs

# Find most central locations
location_nodes = [n for n in G.nodes() if n.startswith('g_')]
top_locations = sorted(
    [(n, degree_centrality[n]) for n in location_nodes], 
    key=lambda x: x[1], 
    reverse=True
)[:10]

print("Most central locations:")
for loc, centrality in top_locations:
    print(f"  {loc}: {centrality:.4f}")

Purpose Analysis

# Analyze purpose patterns
purpose_nodes = [n for n in G.nodes() if n.startswith('p_')]

purpose_stats = {}
for purpose in purpose_nodes:
    # Get connected locations
    connected_locs = [n for n in G.neighbors(purpose) if n.startswith('g_')]
    
    # Get connected times  
    connected_hours = [n for n in G.neighbors(purpose) if n.startswith('h_')]
    
    purpose_stats[purpose] = {
        'locations': len(connected_locs),
        'peak_hours': connected_hours,
        'total_weight': sum(G[purpose][loc]['weight'] for loc in connected_locs)
    }

print("Purpose analysis:")
for purpose, stats in purpose_stats.items():
    print(f"  {purpose}: {stats}")

Graph Retrieval (GraphRAG)

The GraphRAGGenerate class implements sophisticated graph-based retrieval:

Basic Retrieval

from mrra.retriever.graph_rag import GraphRAGGenerate

retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)

# Retrieve relevant locations for a query
docs = retriever.get_relevant_documents({
    "user_id": "user_123",
    "t": "2024-09-10 12:30:00",    # Optional timestamp
    "purpose": ["dining", "work"],   # Optional purpose filter  
    "k": 8                          # Number of results
})

for doc in docs:
    print(f"Location: {doc.metadata['node']}, Score: {doc.metadata['score']:.4f}")

Retrieval Configuration

# Configure retrieval weights
retriever.purpose_weight = 0.6   # Purpose seed importance
retriever.hour_weight = 0.5      # Hour context importance  
retriever.dow_weight = 0.3       # Day-of-week importance
retriever.recent_weight = 0.2    # Recent location bias

# Advanced retrieval with multiple seeds
docs = retriever.get_relevant_documents({
    "user_id": "user_123",
    "purpose": "dining",           # Single purpose
    "hour": 12,                   # Explicit hour
    "dow": 1,                     # Tuesday
    "recent_locations": ["g_1234_5678"],  # Recent context
    "k": 10
})

Custom Retrieval Logic

class EnhancedGraphRetriever(GraphRAGGenerate):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    
    def get_contextual_seeds(self, query):
        """Generate context-aware seeds"""
        seeds = {}
        
        # Base user seed
        if "user_id" in query:
            seeds[f"u_{query['user_id']}"] = 1.0
        
        # Time-based seeds with decay
        if "t" in query:
            from datetime import datetime
            dt = datetime.fromisoformat(query["t"])
            
            # Current hour gets full weight
            seeds[f"h_{dt.hour}"] = self.hour_weight
            
            # Adjacent hours get reduced weight  
            for offset in [-1, 1]:
                adj_hour = (dt.hour + offset) % 24
                seeds[f"h_{adj_hour}"] = self.hour_weight * 0.5
            
            # Day of week
            seeds[f"d_{dt.weekday()}"] = self.dow_weight
        
        # Purpose seeds with expansion
        if "purpose" in query:
            purposes = query["purpose"] if isinstance(query["purpose"], list) else [query["purpose"]]
            
            for purpose in purposes:
                seeds[f"p_{purpose}"] = self.purpose_weight
                
                # Add related purposes (simplified example)
                if purpose == "work":
                    seeds["p_meeting"] = self.purpose_weight * 0.3
                elif purpose == "dining":
                    seeds["p_shopping"] = self.purpose_weight * 0.2
        
        return seeds
    
    def apply_temporal_decay(self, scores, query):
        """Apply temporal decay to scores based on recency"""
        if "recent_locations" not in query:
            return scores
        
        recent_locs = query["recent_locations"]
        decay_factor = 0.9
        
        for loc in recent_locs:
            if loc in scores:
                scores[loc] *= (1 + self.recent_weight * decay_factor)
        
        return scores

Graph Persistence and Caching

Saving and Loading Graphs

from mrra.persist.cache import CacheManager

cm = CacheManager()
tb_hash = compute_tb_hash(tb)

# Save graph
cm.save_graph(tb_hash, "mobility_default", mg.G)

# Load graph
cached_graph = cm.load_graph(tb_hash, "mobility_default")
if cached_graph:
    mg.G = cached_graph
    print("Loaded cached graph")

Graph Versioning

# Save with configuration-specific key
config_key = f"mobility_grid{cfg.grid_size_m}_dwell{cfg.min_dwell_minutes}"
cm.save_graph(tb_hash, config_key, mg.G)

# Load specific version
specific_graph = cm.load_graph(tb_hash, config_key)

Visualization and Export

Basic Graph Export

# Export for external analysis
import pickle
import json

# Export as pickle
with open('mobility_graph.pkl', 'wb') as f:
    pickle.dump(mg.G, f)

# Export node/edge lists
nodes_data = []
for node, data in mg.G.nodes(data=True):
    nodes_data.append({'id': node, 'type': node.split('_')[0], **data})

edges_data = []
for src, dst, data in mg.G.edges(data=True):
    edges_data.append({'source': src, 'target': dst, **data})

graph_export = {
    'nodes': nodes_data,
    'edges': edges_data,
    'metadata': {
        'grid_size_m': cfg.grid_size_m,
        'construction_time': str(datetime.now()),
        'node_count': mg.G.number_of_nodes(),
        'edge_count': mg.G.number_of_edges()
    }
}

with open('mobility_graph.json', 'w') as f:
    json.dump(graph_export, f, indent=2)

GraphRAG Principle: Uses user/hour/day/purpose nodes as "seeds", propagating through edge weights and seed type weights to location nodes. This provides more semantic location ranking while incorporating recency bias from recent locations.

Performance Considerations

Large Graph Optimization

# For large datasets, consider subgraph extraction
def extract_user_subgraph(G, user_id, hops=2):
    """Extract subgraph around a specific user"""
    import networkx as nx
    
    user_node = f"u_{user_id}"
    if user_node not in G:
        return nx.MultiDiGraph()
    
    # Get nodes within n hops
    subgraph_nodes = set([user_node])
    current_nodes = {user_node}
    
    for _ in range(hops):
        next_nodes = set()
        for node in current_nodes:
            next_nodes.update(G.neighbors(node))
        subgraph_nodes.update(next_nodes)
        current_nodes = next_nodes
    
    return G.subgraph(subgraph_nodes).copy()

# Use subgraph for user-specific analysis
user_subgraph = extract_user_subgraph(mg.G, "user_123")

Memory Management

# For memory-constrained environments
def compress_graph_weights(G, precision=3):
    """Compress edge weights to reduce memory usage"""
    for src, dst, key, data in G.edges(data=True, keys=True):
        if 'weight' in data:
            G[src][dst][key]['weight'] = round(data['weight'], precision)
    return G

# Apply compression
mg.G = compress_graph_weights(mg.G)

Next Steps