MRRA LogoMRRA
Core Concepts

Multi-Agent Reflection

Principles and configuration of multi-agent reflection systems

Multi-Agent Reflection

Multi-agent reflection is a core mechanism in MRRA that uses multiple specialized sub-agents to collaboratively solve mobility prediction problems. Each sub-agent brings different expertise and perspectives, then their outputs are aggregated for robust, interpretable results.

Core Concept

The multi-agent reflection system works through these steps:

  1. Multiple Sub-Agents: Each specialized in different aspects (temporal, spatial, routing, purpose-based)
  2. Structured Responses: Each sub-agent provides selection/path_ids, confidence, and rationale
  3. Aggregation: A aggregator combines responses using various strategies
  4. Tool Integration: Sub-agents can access external tools via MCP protocol

Key Benefits:

  • Robustness: Multiple perspectives reduce single-point failures
  • Interpretability: Each sub-agent provides reasoning rationale
  • Extensibility: Easy to add new specialized agents or tools

Basic Configuration

Simple Two-Agent Setup

from mrra.agents.builder import build_mrra_agent

# Basic reflection configuration
reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "temporal", 
            "prompt": "Select the most likely location id from Options (selection), do not output coordinates."
        },
        {
            "name": "spatial",  
            "prompt": "Select the most likely location id from Options (selection), do not output coordinates."
        },
    ],
    aggregator="confidence_weighted_voting",
)

# Build agent with reflection
agent = build_mrra_agent(
    llm=llm_cfg, 
    retriever=retriever, 
    reflection=reflection_cfg
)

# Execute prediction
result = agent.invoke({
    "task": "next_position", 
    "user_id": "user_123", 
    "t": "2024-09-10 12:30:00"
})

Advanced Multi-Agent Configuration

# Advanced configuration with specialized agents
advanced_reflection_cfg = dict(
    max_round=2,  # Allow multiple rounds of reflection
    subAgents=[
        {
            "name": "temporal_expert",
            "prompt": """You are a temporal mobility expert. Analyze time patterns and select the most likely location based on:
            - Hour-of-day preferences
            - Day-of-week patterns  
            - Historical timing patterns
            Select from Options (selection) and provide confidence score.""",
            "temperature": 0.1,  # Lower temperature for consistency
        },
        {
            "name": "spatial_expert", 
            "prompt": """You are a spatial mobility expert. Analyze location patterns and select the most likely location based on:
            - Spatial clustering and hotspots
            - Distance and accessibility
            - Geographic context
            Select from Options (selection) and provide confidence score.""",
            "temperature": 0.1,
        },
        {
            "name": "purpose_expert",
            "prompt": """You are a purpose-driven mobility expert. Analyze activity purposes and select the most likely location based on:
            - Activity purpose patterns
            - Sequential activity logic
            - Context-appropriate locations
            Select from Options (selection) and provide confidence score.""",
            "temperature": 0.2,
        },
        {
            "name": "routing_expert",
            "prompt": """You are a routing and transition expert. Analyze movement patterns and select the most likely location based on:
            - Common transition routes
            - Movement efficiency
            - Sequential location logic
            Select from Options (selection) and provide confidence score.""",
            "temperature": 0.15,
        }
    ],
    aggregator="confidence_weighted_voting",
    consensus_threshold=0.7,  # Require high consensus for final decision
)

Sub-Agent Specializations

Temporal Sub-Agents

Focus on time-based patterns and preferences:

{
    "name": "hourly_expert",
    "prompt": """Analyze hourly mobility patterns:
    - Peak activity hours (7-9am, 12-2pm, 5-7pm)
    - Off-peak behavior patterns
    - Lunch break timing (11am-2pm)  
    - Evening routines (6-10pm)
    
    Consider the current time context and select the most temporally appropriate location."""
}
{
    "name": "weekly_expert", 
    "prompt": """Analyze weekly mobility patterns:
    - Weekday vs weekend behavior
    - Monday morning patterns
    - Friday evening patterns
    - Weekend leisure activities
    
    Consider day-of-week context and select accordingly."""
}
{
    "name": "seasonal_expert",
    "prompt": """Analyze seasonal and calendar patterns:
    - Weather impact on mobility
    - Holiday and special event patterns
    - Seasonal activity preferences
    - Academic calendar influences
    
    Consider seasonal context in your selection."""
}

Spatial Sub-Agents

Focus on geographic and location-based analysis:

spatial_agents = [
    {
        "name": "clustering_expert",
        "prompt": """Analyze spatial clustering patterns:
        - Identify activity hotspots and clusters
        - Consider spatial density and accessibility
        - Evaluate distance relationships
        - Assess geographic constraints"""
    },
    {
        "name": "poi_expert", 
        "prompt": """Analyze Point-of-Interest patterns:
        - Consider POI categories and purposes
        - Evaluate business hours and availability
        - Assess POI popularity and capacity
        - Consider service accessibility"""
    },
    {
        "name": "network_expert",
        "prompt": """Analyze transportation network factors:
        - Consider route accessibility
        - Evaluate travel time and distance
        - Assess transportation mode preferences  
        - Consider network connectivity"""
    }
]

Purpose-Driven Sub-Agents

Focus on activity purposes and contextual logic:

purpose_agents = [
    {
        "name": "activity_sequence_expert",
        "prompt": """Analyze activity sequence logic:
        - Common activity transitions (work→dining→work→home)
        - Purpose-driven location selection
        - Activity duration expectations
        - Sequential consistency"""
    },
    {
        "name": "lifestyle_expert",
        "prompt": """Analyze lifestyle and behavioral patterns:
        - Personal preferences and habits
        - Routine vs spontaneous behavior
        - Social and cultural factors
        - Individual mobility style"""
    }
]

Aggregation Strategies

Confidence Weighted Voting

The default aggregation strategy that weights each sub-agent's vote by their confidence:

def confidence_weighted_voting(subagent_responses):
    """Aggregate responses using confidence-weighted voting"""
    
    location_scores = {}
    total_confidence = 0
    
    for response in subagent_responses:
        selection = response.get('selection')
        confidence = response.get('confidence', 0.5)
        
        if selection:
            location_scores[selection] = location_scores.get(selection, 0) + confidence
            total_confidence += confidence
    
    if not location_scores:
        return None
    
    # Normalize scores
    for location in location_scores:
        location_scores[location] /= total_confidence
    
    # Select highest scoring location
    best_location = max(location_scores.items(), key=lambda x: x[1])
    
    return {
        'selection': best_location[0],
        'confidence': best_location[1],
        'scores': location_scores,
        'rationale': f"Selected by {len(location_scores)} agents with average confidence {best_location[1]:.3f}"
    }

Custom Aggregation Strategies

class ConsensusAggregator:
    def __init__(self, consensus_threshold=0.6):
        self.consensus_threshold = consensus_threshold
    
    def aggregate(self, subagent_responses):
        """Require consensus above threshold"""
        
        selections = [r.get('selection') for r in subagent_responses if r.get('selection')]
        
        if not selections:
            return None
        
        # Count selections
        selection_counts = {}
        for selection in selections:
            selection_counts[selection] = selection_counts.get(selection, 0) + 1
        
        # Check for consensus
        total_agents = len(subagent_responses)
        for selection, count in selection_counts.items():
            if count / total_agents >= self.consensus_threshold:
                avg_confidence = sum(
                    r['confidence'] for r in subagent_responses 
                    if r.get('selection') == selection
                ) / count
                
                return {
                    'selection': selection,
                    'confidence': avg_confidence,
                    'consensus_ratio': count / total_agents,
                    'rationale': f"Consensus reached by {count}/{total_agents} agents"
                }
        
        # No consensus reached
        return {
            'selection': max(selection_counts.items(), key=lambda x: x[1])[0],
            'confidence': 0.3,  # Low confidence due to lack of consensus
            'consensus_ratio': max(selection_counts.values()) / total_agents,
            'rationale': "No strong consensus, selected most popular choice"
        }

Tool Integration via MCP

Sub-agents can access external tools through the MCP (Model Context Protocol) framework:

Weather Tool Integration

reflection_cfg_with_weather = dict(
    max_round=1,
    subAgents=[
        {
            "name": "weather_aware_agent",
            "prompt": """Consider weather conditions in your location selection:
            - Indoor vs outdoor activity preferences
            - Weather-appropriate locations
            - Seasonal activity adjustments
            Select from Options and explain weather impact.""",
            "mcp": {
                "weather": {}  # Enable weather tool
            }
        },
        {
            "name": "baseline_agent",
            "prompt": "Select based on historical patterns without external context."
        }
    ],
    aggregator="confidence_weighted_voting"
)

Maps and POI Integration

reflection_cfg_with_maps = dict(
    max_round=1,
    subAgents=[
        {
            "name": "poi_aware_agent", 
            "prompt": """Use nearby POI information to inform location selection:
            - Business hours and availability
            - POI categories and services
            - Popular destinations
            Select from Options using POI context.""",
            "mcp": {
                "maps": {},  # Enable POI lookup tool
                "gmap": {    # Enable remote map service
                    "url": "https://your-mcp-server/sse",
                    "transport": "sse"
                }
            }
        }
    ],
    aggregator="confidence_weighted_voting"
)

Custom MCP Tool Development

# Custom MCP tool for traffic information
def create_traffic_tool():
    """Create a traffic-aware MCP tool"""
    
    def get_traffic_conditions(location, time):
        # Stub implementation - replace with real traffic API
        return {
            'congestion_level': 'moderate',
            'travel_time_factor': 1.2,
            'alternative_routes': ['route_a', 'route_b'],
            'peak_hours': [8, 9, 17, 18, 19]
        }
    
    return {
        'name': 'traffic',
        'description': 'Get real-time traffic conditions',
        'function': get_traffic_conditions
    }

# Use in sub-agent configuration
traffic_aware_config = {
    "name": "traffic_expert",
    "prompt": """Consider traffic conditions in location selection:
    - Avoid high-congestion areas during peak hours  
    - Suggest alternative routes when needed
    - Factor in travel time impacts""",
    "mcp": {
        "traffic": {}  # Custom traffic tool
    }
}

Advanced Reflection Patterns

Multi-Round Reflection

Enable multiple rounds of reflection for complex decisions:

multi_round_cfg = dict(
    max_round=3,
    subAgents=[
        {
            "name": "initial_analyzer",
            "prompt": "Provide initial analysis and broad location candidates."
        },
        {
            "name": "detail_evaluator", 
            "prompt": "Evaluate initial suggestions in detail and refine selections."
        },
        {
            "name": "final_validator",
            "prompt": "Validate final selections against all available evidence."
        }
    ],
    aggregator="consensus_with_validation",
    round_feedback=True  # Allow agents to see previous round results
)

Hierarchical Agent Structure

Create hierarchical agent structures for complex reasoning:

hierarchical_cfg = dict(
    max_round=1,
    subAgents=[
        # Tier 1: Context Analysts
        {
            "name": "context_temporal",
            "prompt": "Analyze temporal context and provide time-based insights.",
            "tier": 1
        },
        {
            "name": "context_spatial", 
            "prompt": "Analyze spatial context and provide location insights.",
            "tier": 1
        },
        # Tier 2: Decision Makers (receive Tier 1 outputs)
        {
            "name": "decision_primary",
            "prompt": "Make primary location decision based on context analysis.",
            "tier": 2,
            "depends_on": ["context_temporal", "context_spatial"]
        },
        {
            "name": "decision_alternative",
            "prompt": "Provide alternative location decision and backup options.",
            "tier": 2,
            "depends_on": ["context_temporal", "context_spatial"]
        }
    ],
    aggregator="hierarchical_consensus"
)

Performance and Optimization

Parallel Sub-Agent Execution

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def execute_subagents_parallel(subagents, query, llm):
    """Execute sub-agents in parallel for better performance"""
    
    async def execute_single_agent(agent_config):
        # Execute single sub-agent
        result = await llm.ainvoke(agent_config['prompt'] + f"\nQuery: {query}")
        return {
            'agent': agent_config['name'],
            'result': result,
            'timestamp': datetime.now().isoformat()
        }
    
    # Execute all sub-agents concurrently  
    tasks = [execute_single_agent(agent) for agent in subagents]
    results = await asyncio.gather(*tasks)
    
    return results

Caching Sub-Agent Results

class CachedReflectionAgent:
    def __init__(self, base_agent, cache_manager):
        self.base_agent = base_agent
        self.cache = cache_manager
        
    def invoke(self, query):
        # Create cache key from query
        cache_key = self._create_cache_key(query)
        
        # Try to get cached result
        cached_result = self.cache.get_reflection_result(cache_key)
        if cached_result:
            return cached_result
        
        # Execute reflection if not cached
        result = self.base_agent.invoke(query)
        
        # Cache result
        self.cache.save_reflection_result(cache_key, result)
        
        return result
    
    def _create_cache_key(self, query):
        import hashlib
        query_str = f"{query.get('user_id', '')}_{query.get('task', '')}_{query.get('t', '')}"
        return hashlib.md5(query_str.encode()).hexdigest()

Performance Considerations:

  • Parallel execution can significantly speed up multi-agent systems
  • Cache sub-agent results for repeated similar queries
  • Consider sub-agent complexity vs. accuracy trade-offs
  • Monitor LLM API costs with multiple agents

Debugging and Monitoring

Sub-Agent Response Analysis

def analyze_subagent_responses(responses):
    """Analyze sub-agent response patterns"""
    
    analysis = {
        'response_count': len(responses),
        'confidence_stats': {},
        'selection_distribution': {},
        'reasoning_themes': []
    }
    
    # Confidence statistics
    confidences = [r.get('confidence', 0) for r in responses]
    analysis['confidence_stats'] = {
        'mean': sum(confidences) / len(confidences) if confidences else 0,
        'min': min(confidences) if confidences else 0,
        'max': max(confidences) if confidences else 0,
        'std': np.std(confidences) if confidences else 0
    }
    
    # Selection distribution
    selections = [r.get('selection') for r in responses if r.get('selection')]
    for selection in selections:
        analysis['selection_distribution'][selection] = analysis['selection_distribution'].get(selection, 0) + 1
    
    # Reasoning analysis (simplified)
    rationales = [r.get('rationale', '') for r in responses]
    common_terms = {}
    for rationale in rationales:
        words = rationale.lower().split()
        for word in words:
            if len(word) > 3:  # Filter short words
                common_terms[word] = common_terms.get(word, 0) + 1
    
    analysis['reasoning_themes'] = sorted(common_terms.items(), key=lambda x: x[1], reverse=True)[:10]
    
    return analysis

Next Steps