Mobility Graph
Concepts, structure and usage of the heterogeneous mobility graph
Mobility Graph
The Mobility Graph is a unified representation of "person-location-time-purpose" relationships using a multi-type heterogeneous graph (networkx.MultiDiGraph). It enables retrieval, reasoning, and interpretable analysis of mobility patterns.
Graph Structure
Node Types
The mobility graph contains several types of nodes, each representing different aspects of mobility:
Format: u_<user_id>
Represents individual users in the system.
# Example user nodes
"u_user_123"
"u_user_456" Format: g_<gy>_<gx> (grid-based)
Attributes:
gy,gx: Grid coordinateslat,lon: Approximate coordinates (when built with activities)
# Example location nodes
"g_1234_5678" # Grid cell at y=1234, x=5678
"g_2345_6789" # Another grid cellHour nodes: h_<0..23>
Day-of-week nodes: d_<0..6> (0=Monday)
Time-bin nodes: t_<hour>_<dow>
# Example temporal nodes
"h_9" # 9 AM
"d_1" # Tuesday
"t_9_1" # 9 AM on TuesdayFormat: p_<purpose>
Attributes:
name: Purpose description
# Example purpose nodes
"p_work" # Work activities
"p_dining" # Dining activities
"p_home" # Home/residential activitiesEdge Types and Weights
The graph contains multiple types of edges representing different relationships:
User-Location Edges
- Direction: user → location
- Weight: Visit intensity (count for point-based construction; minutes for activity-based construction)
- Attributes:
activity_type,activity_purpose
Temporal Co-occurrence Edges
- Direction: Bidirectional (loc ↔ hour/dow/timebin)
- Weight: Co-occurrence strength (minutes/counts)
- Purpose: Captures when locations are typically visited
Purpose-Context Edges
- Direction: Bidirectional (purpose ↔ loc/hour/dow/timebin)
- Weight: Co-occurrence strength (minutes)
- Purpose: Links purposes to spatial and temporal contexts
User-Purpose Edges
- Direction: user → purpose
- Weight: Total time spent on this purpose (minutes)
- Purpose: User activity profiling
Transition Edges
- Location transitions: loc → loc (adjacent activity transitions)
- Purpose transitions: purpose → purpose (adjacent activity purpose transitions)
- Attributes: Include user information for personalized analysis
Graph Construction
Basic Construction
from mrra.graph.mobility_graph import MobilityGraph, GraphConfig
# Configure graph parameters
cfg = GraphConfig(
grid_size_m=200, # Grid cell size in meters
min_dwell_minutes=5, # Minimum dwell time for activities
use_activities=True # Use activity-based construction
)
# Build the graph
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)
G = mg.G # Access the networkx graphConfiguration Options
# Detailed configuration example
cfg = GraphConfig(
grid_size_m=200, # Grid resolution
min_dwell_minutes=5, # Activity detection threshold
use_activities=True, # Use activities vs raw points
include_transitions=True, # Include loc→loc edges
purpose_transitions=True, # Include purpose→purpose edges
temporal_granularity='hour', # 'hour', 'timebin', or 'both'
weight_by_duration=True # Weight edges by time spent
)Advanced Construction with Custom Activities
# Use pre-computed activities with purposes
from mrra.data.activity import ActivityExtractor
from mrra.analysis.activity_purpose import ActivityPurposeAssigner
# Extract and assign purposes to activities
ext_cfg = dict(method="radius", radius_m=300, min_dwell_minutes=30)
acts = ActivityExtractor(tb, **ext_cfg).extract()
acts = ActivityPurposeAssigner(tb, llm=llm, concurrency=8).assign(acts)
# Build graph with purpose-enriched activities
cfg = GraphConfig(grid_size_m=200, use_activities=True)
mg = MobilityGraph(tb, cfg, activities=acts, assume_purposes_assigned=True)Graph Analysis
Basic Graph Statistics
# Get basic graph information
print(f"Nodes: {G.number_of_nodes()}")
print(f"Edges: {G.number_of_edges()}")
# Analyze node types
node_types = {}
for node in G.nodes():
node_type = node.split('_')[0]
node_types[node_type] = node_types.get(node_type, 0) + 1
print("Node type distribution:", node_types)Centrality Analysis
import networkx as nx
# Calculate centrality measures
degree_centrality = nx.degree_centrality(G)
betweenness_centrality = nx.betweenness_centrality(G, k=1000) # Sample for large graphs
# Find most central locations
location_nodes = [n for n in G.nodes() if n.startswith('g_')]
top_locations = sorted(
[(n, degree_centrality[n]) for n in location_nodes],
key=lambda x: x[1],
reverse=True
)[:10]
print("Most central locations:")
for loc, centrality in top_locations:
print(f" {loc}: {centrality:.4f}")Purpose Analysis
# Analyze purpose patterns
purpose_nodes = [n for n in G.nodes() if n.startswith('p_')]
purpose_stats = {}
for purpose in purpose_nodes:
# Get connected locations
connected_locs = [n for n in G.neighbors(purpose) if n.startswith('g_')]
# Get connected times
connected_hours = [n for n in G.neighbors(purpose) if n.startswith('h_')]
purpose_stats[purpose] = {
'locations': len(connected_locs),
'peak_hours': connected_hours,
'total_weight': sum(G[purpose][loc]['weight'] for loc in connected_locs)
}
print("Purpose analysis:")
for purpose, stats in purpose_stats.items():
print(f" {purpose}: {stats}")Graph Retrieval (GraphRAG)
The GraphRAGGenerate class implements sophisticated graph-based retrieval:
Basic Retrieval
from mrra.retriever.graph_rag import GraphRAGGenerate
retriever = GraphRAGGenerate(tb=tb, mobility_graph=mg)
# Retrieve relevant locations for a query
docs = retriever.get_relevant_documents({
"user_id": "user_123",
"t": "2024-09-10 12:30:00", # Optional timestamp
"purpose": ["dining", "work"], # Optional purpose filter
"k": 8 # Number of results
})
for doc in docs:
print(f"Location: {doc.metadata['node']}, Score: {doc.metadata['score']:.4f}")Retrieval Configuration
# Configure retrieval weights
retriever.purpose_weight = 0.6 # Purpose seed importance
retriever.hour_weight = 0.5 # Hour context importance
retriever.dow_weight = 0.3 # Day-of-week importance
retriever.recent_weight = 0.2 # Recent location bias
# Advanced retrieval with multiple seeds
docs = retriever.get_relevant_documents({
"user_id": "user_123",
"purpose": "dining", # Single purpose
"hour": 12, # Explicit hour
"dow": 1, # Tuesday
"recent_locations": ["g_1234_5678"], # Recent context
"k": 10
})Custom Retrieval Logic
class EnhancedGraphRetriever(GraphRAGGenerate):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def get_contextual_seeds(self, query):
"""Generate context-aware seeds"""
seeds = {}
# Base user seed
if "user_id" in query:
seeds[f"u_{query['user_id']}"] = 1.0
# Time-based seeds with decay
if "t" in query:
from datetime import datetime
dt = datetime.fromisoformat(query["t"])
# Current hour gets full weight
seeds[f"h_{dt.hour}"] = self.hour_weight
# Adjacent hours get reduced weight
for offset in [-1, 1]:
adj_hour = (dt.hour + offset) % 24
seeds[f"h_{adj_hour}"] = self.hour_weight * 0.5
# Day of week
seeds[f"d_{dt.weekday()}"] = self.dow_weight
# Purpose seeds with expansion
if "purpose" in query:
purposes = query["purpose"] if isinstance(query["purpose"], list) else [query["purpose"]]
for purpose in purposes:
seeds[f"p_{purpose}"] = self.purpose_weight
# Add related purposes (simplified example)
if purpose == "work":
seeds["p_meeting"] = self.purpose_weight * 0.3
elif purpose == "dining":
seeds["p_shopping"] = self.purpose_weight * 0.2
return seeds
def apply_temporal_decay(self, scores, query):
"""Apply temporal decay to scores based on recency"""
if "recent_locations" not in query:
return scores
recent_locs = query["recent_locations"]
decay_factor = 0.9
for loc in recent_locs:
if loc in scores:
scores[loc] *= (1 + self.recent_weight * decay_factor)
return scoresGraph Persistence and Caching
Saving and Loading Graphs
from mrra.persist.cache import CacheManager
cm = CacheManager()
tb_hash = compute_tb_hash(tb)
# Save graph
cm.save_graph(tb_hash, "mobility_default", mg.G)
# Load graph
cached_graph = cm.load_graph(tb_hash, "mobility_default")
if cached_graph:
mg.G = cached_graph
print("Loaded cached graph")Graph Versioning
# Save with configuration-specific key
config_key = f"mobility_grid{cfg.grid_size_m}_dwell{cfg.min_dwell_minutes}"
cm.save_graph(tb_hash, config_key, mg.G)
# Load specific version
specific_graph = cm.load_graph(tb_hash, config_key)Visualization and Export
Basic Graph Export
# Export for external analysis
import pickle
import json
# Export as pickle
with open('mobility_graph.pkl', 'wb') as f:
pickle.dump(mg.G, f)
# Export node/edge lists
nodes_data = []
for node, data in mg.G.nodes(data=True):
nodes_data.append({'id': node, 'type': node.split('_')[0], **data})
edges_data = []
for src, dst, data in mg.G.edges(data=True):
edges_data.append({'source': src, 'target': dst, **data})
graph_export = {
'nodes': nodes_data,
'edges': edges_data,
'metadata': {
'grid_size_m': cfg.grid_size_m,
'construction_time': str(datetime.now()),
'node_count': mg.G.number_of_nodes(),
'edge_count': mg.G.number_of_edges()
}
}
with open('mobility_graph.json', 'w') as f:
json.dump(graph_export, f, indent=2)GraphRAG Principle: Uses user/hour/day/purpose nodes as "seeds", propagating through edge weights and seed type weights to location nodes. This provides more semantic location ranking while incorporating recency bias from recent locations.
Performance Considerations
Large Graph Optimization
# For large datasets, consider subgraph extraction
def extract_user_subgraph(G, user_id, hops=2):
"""Extract subgraph around a specific user"""
import networkx as nx
user_node = f"u_{user_id}"
if user_node not in G:
return nx.MultiDiGraph()
# Get nodes within n hops
subgraph_nodes = set([user_node])
current_nodes = {user_node}
for _ in range(hops):
next_nodes = set()
for node in current_nodes:
next_nodes.update(G.neighbors(node))
subgraph_nodes.update(next_nodes)
current_nodes = next_nodes
return G.subgraph(subgraph_nodes).copy()
# Use subgraph for user-specific analysis
user_subgraph = extract_user_subgraph(mg.G, "user_123")Memory Management
# For memory-constrained environments
def compress_graph_weights(G, precision=3):
"""Compress edge weights to reduce memory usage"""
for src, dst, key, data in G.edges(data=True, keys=True):
if 'weight' in data:
G[src][dst][key]['weight'] = round(data['weight'], precision)
return G
# Apply compression
mg.G = compress_graph_weights(mg.G)Next Steps
- Learn about Multi-Agent Reflection to leverage graph insights
- Explore MCP Integration for real-time graph enrichment
- Check out Caching and Performance for optimization strategies