Multi-Agent Reflection
Principles and configuration of multi-agent reflection systems
Multi-Agent Reflection
Multi-agent reflection is a core mechanism in MRRA that uses multiple specialized sub-agents to collaboratively solve mobility prediction problems. Each sub-agent brings different expertise and perspectives, then their outputs are aggregated for robust, interpretable results.
Core Concept
The multi-agent reflection system works through these steps:
- Multiple Sub-Agents: Each specialized in different aspects (temporal, spatial, routing, purpose-based)
- Structured Responses: Each sub-agent provides
selection/path_ids,confidence, andrationale - Aggregation: A aggregator combines responses using various strategies
- Tool Integration: Sub-agents can access external tools via MCP protocol
Key Benefits:
- Robustness: Multiple perspectives reduce single-point failures
- Interpretability: Each sub-agent provides reasoning rationale
- Extensibility: Easy to add new specialized agents or tools
Basic Configuration
Simple Two-Agent Setup
from mrra.agents.builder import build_mrra_agent
# Basic reflection configuration
reflection_cfg = dict(
max_round=1,
subAgents=[
{
"name": "temporal",
"prompt": "Select the most likely location id from Options (selection), do not output coordinates."
},
{
"name": "spatial",
"prompt": "Select the most likely location id from Options (selection), do not output coordinates."
},
],
aggregator="confidence_weighted_voting",
)
# Build agent with reflection
agent = build_mrra_agent(
llm=llm_cfg,
retriever=retriever,
reflection=reflection_cfg
)
# Execute prediction
result = agent.invoke({
"task": "next_position",
"user_id": "user_123",
"t": "2024-09-10 12:30:00"
})Advanced Multi-Agent Configuration
# Advanced configuration with specialized agents
advanced_reflection_cfg = dict(
max_round=2, # Allow multiple rounds of reflection
subAgents=[
{
"name": "temporal_expert",
"prompt": """You are a temporal mobility expert. Analyze time patterns and select the most likely location based on:
- Hour-of-day preferences
- Day-of-week patterns
- Historical timing patterns
Select from Options (selection) and provide confidence score.""",
"temperature": 0.1, # Lower temperature for consistency
},
{
"name": "spatial_expert",
"prompt": """You are a spatial mobility expert. Analyze location patterns and select the most likely location based on:
- Spatial clustering and hotspots
- Distance and accessibility
- Geographic context
Select from Options (selection) and provide confidence score.""",
"temperature": 0.1,
},
{
"name": "purpose_expert",
"prompt": """You are a purpose-driven mobility expert. Analyze activity purposes and select the most likely location based on:
- Activity purpose patterns
- Sequential activity logic
- Context-appropriate locations
Select from Options (selection) and provide confidence score.""",
"temperature": 0.2,
},
{
"name": "routing_expert",
"prompt": """You are a routing and transition expert. Analyze movement patterns and select the most likely location based on:
- Common transition routes
- Movement efficiency
- Sequential location logic
Select from Options (selection) and provide confidence score.""",
"temperature": 0.15,
}
],
aggregator="confidence_weighted_voting",
consensus_threshold=0.7, # Require high consensus for final decision
)Sub-Agent Specializations
Temporal Sub-Agents
Focus on time-based patterns and preferences:
{
"name": "hourly_expert",
"prompt": """Analyze hourly mobility patterns:
- Peak activity hours (7-9am, 12-2pm, 5-7pm)
- Off-peak behavior patterns
- Lunch break timing (11am-2pm)
- Evening routines (6-10pm)
Consider the current time context and select the most temporally appropriate location."""
}{
"name": "weekly_expert",
"prompt": """Analyze weekly mobility patterns:
- Weekday vs weekend behavior
- Monday morning patterns
- Friday evening patterns
- Weekend leisure activities
Consider day-of-week context and select accordingly."""
}{
"name": "seasonal_expert",
"prompt": """Analyze seasonal and calendar patterns:
- Weather impact on mobility
- Holiday and special event patterns
- Seasonal activity preferences
- Academic calendar influences
Consider seasonal context in your selection."""
}Spatial Sub-Agents
Focus on geographic and location-based analysis:
spatial_agents = [
{
"name": "clustering_expert",
"prompt": """Analyze spatial clustering patterns:
- Identify activity hotspots and clusters
- Consider spatial density and accessibility
- Evaluate distance relationships
- Assess geographic constraints"""
},
{
"name": "poi_expert",
"prompt": """Analyze Point-of-Interest patterns:
- Consider POI categories and purposes
- Evaluate business hours and availability
- Assess POI popularity and capacity
- Consider service accessibility"""
},
{
"name": "network_expert",
"prompt": """Analyze transportation network factors:
- Consider route accessibility
- Evaluate travel time and distance
- Assess transportation mode preferences
- Consider network connectivity"""
}
]Purpose-Driven Sub-Agents
Focus on activity purposes and contextual logic:
purpose_agents = [
{
"name": "activity_sequence_expert",
"prompt": """Analyze activity sequence logic:
- Common activity transitions (work→dining→work→home)
- Purpose-driven location selection
- Activity duration expectations
- Sequential consistency"""
},
{
"name": "lifestyle_expert",
"prompt": """Analyze lifestyle and behavioral patterns:
- Personal preferences and habits
- Routine vs spontaneous behavior
- Social and cultural factors
- Individual mobility style"""
}
]Aggregation Strategies
Confidence Weighted Voting
The default aggregation strategy that weights each sub-agent's vote by their confidence:
def confidence_weighted_voting(subagent_responses):
"""Aggregate responses using confidence-weighted voting"""
location_scores = {}
total_confidence = 0
for response in subagent_responses:
selection = response.get('selection')
confidence = response.get('confidence', 0.5)
if selection:
location_scores[selection] = location_scores.get(selection, 0) + confidence
total_confidence += confidence
if not location_scores:
return None
# Normalize scores
for location in location_scores:
location_scores[location] /= total_confidence
# Select highest scoring location
best_location = max(location_scores.items(), key=lambda x: x[1])
return {
'selection': best_location[0],
'confidence': best_location[1],
'scores': location_scores,
'rationale': f"Selected by {len(location_scores)} agents with average confidence {best_location[1]:.3f}"
}Custom Aggregation Strategies
class ConsensusAggregator:
def __init__(self, consensus_threshold=0.6):
self.consensus_threshold = consensus_threshold
def aggregate(self, subagent_responses):
"""Require consensus above threshold"""
selections = [r.get('selection') for r in subagent_responses if r.get('selection')]
if not selections:
return None
# Count selections
selection_counts = {}
for selection in selections:
selection_counts[selection] = selection_counts.get(selection, 0) + 1
# Check for consensus
total_agents = len(subagent_responses)
for selection, count in selection_counts.items():
if count / total_agents >= self.consensus_threshold:
avg_confidence = sum(
r['confidence'] for r in subagent_responses
if r.get('selection') == selection
) / count
return {
'selection': selection,
'confidence': avg_confidence,
'consensus_ratio': count / total_agents,
'rationale': f"Consensus reached by {count}/{total_agents} agents"
}
# No consensus reached
return {
'selection': max(selection_counts.items(), key=lambda x: x[1])[0],
'confidence': 0.3, # Low confidence due to lack of consensus
'consensus_ratio': max(selection_counts.values()) / total_agents,
'rationale': "No strong consensus, selected most popular choice"
}Tool Integration via MCP
Sub-agents can access external tools through the MCP (Model Context Protocol) framework:
Weather Tool Integration
reflection_cfg_with_weather = dict(
max_round=1,
subAgents=[
{
"name": "weather_aware_agent",
"prompt": """Consider weather conditions in your location selection:
- Indoor vs outdoor activity preferences
- Weather-appropriate locations
- Seasonal activity adjustments
Select from Options and explain weather impact.""",
"mcp": {
"weather": {} # Enable weather tool
}
},
{
"name": "baseline_agent",
"prompt": "Select based on historical patterns without external context."
}
],
aggregator="confidence_weighted_voting"
)Maps and POI Integration
reflection_cfg_with_maps = dict(
max_round=1,
subAgents=[
{
"name": "poi_aware_agent",
"prompt": """Use nearby POI information to inform location selection:
- Business hours and availability
- POI categories and services
- Popular destinations
Select from Options using POI context.""",
"mcp": {
"maps": {}, # Enable POI lookup tool
"gmap": { # Enable remote map service
"url": "https://your-mcp-server/sse",
"transport": "sse"
}
}
}
],
aggregator="confidence_weighted_voting"
)Custom MCP Tool Development
# Custom MCP tool for traffic information
def create_traffic_tool():
"""Create a traffic-aware MCP tool"""
def get_traffic_conditions(location, time):
# Stub implementation - replace with real traffic API
return {
'congestion_level': 'moderate',
'travel_time_factor': 1.2,
'alternative_routes': ['route_a', 'route_b'],
'peak_hours': [8, 9, 17, 18, 19]
}
return {
'name': 'traffic',
'description': 'Get real-time traffic conditions',
'function': get_traffic_conditions
}
# Use in sub-agent configuration
traffic_aware_config = {
"name": "traffic_expert",
"prompt": """Consider traffic conditions in location selection:
- Avoid high-congestion areas during peak hours
- Suggest alternative routes when needed
- Factor in travel time impacts""",
"mcp": {
"traffic": {} # Custom traffic tool
}
}Advanced Reflection Patterns
Multi-Round Reflection
Enable multiple rounds of reflection for complex decisions:
multi_round_cfg = dict(
max_round=3,
subAgents=[
{
"name": "initial_analyzer",
"prompt": "Provide initial analysis and broad location candidates."
},
{
"name": "detail_evaluator",
"prompt": "Evaluate initial suggestions in detail and refine selections."
},
{
"name": "final_validator",
"prompt": "Validate final selections against all available evidence."
}
],
aggregator="consensus_with_validation",
round_feedback=True # Allow agents to see previous round results
)Hierarchical Agent Structure
Create hierarchical agent structures for complex reasoning:
hierarchical_cfg = dict(
max_round=1,
subAgents=[
# Tier 1: Context Analysts
{
"name": "context_temporal",
"prompt": "Analyze temporal context and provide time-based insights.",
"tier": 1
},
{
"name": "context_spatial",
"prompt": "Analyze spatial context and provide location insights.",
"tier": 1
},
# Tier 2: Decision Makers (receive Tier 1 outputs)
{
"name": "decision_primary",
"prompt": "Make primary location decision based on context analysis.",
"tier": 2,
"depends_on": ["context_temporal", "context_spatial"]
},
{
"name": "decision_alternative",
"prompt": "Provide alternative location decision and backup options.",
"tier": 2,
"depends_on": ["context_temporal", "context_spatial"]
}
],
aggregator="hierarchical_consensus"
)Performance and Optimization
Parallel Sub-Agent Execution
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def execute_subagents_parallel(subagents, query, llm):
"""Execute sub-agents in parallel for better performance"""
async def execute_single_agent(agent_config):
# Execute single sub-agent
result = await llm.ainvoke(agent_config['prompt'] + f"\nQuery: {query}")
return {
'agent': agent_config['name'],
'result': result,
'timestamp': datetime.now().isoformat()
}
# Execute all sub-agents concurrently
tasks = [execute_single_agent(agent) for agent in subagents]
results = await asyncio.gather(*tasks)
return resultsCaching Sub-Agent Results
class CachedReflectionAgent:
def __init__(self, base_agent, cache_manager):
self.base_agent = base_agent
self.cache = cache_manager
def invoke(self, query):
# Create cache key from query
cache_key = self._create_cache_key(query)
# Try to get cached result
cached_result = self.cache.get_reflection_result(cache_key)
if cached_result:
return cached_result
# Execute reflection if not cached
result = self.base_agent.invoke(query)
# Cache result
self.cache.save_reflection_result(cache_key, result)
return result
def _create_cache_key(self, query):
import hashlib
query_str = f"{query.get('user_id', '')}_{query.get('task', '')}_{query.get('t', '')}"
return hashlib.md5(query_str.encode()).hexdigest()Performance Considerations:
- Parallel execution can significantly speed up multi-agent systems
- Cache sub-agent results for repeated similar queries
- Consider sub-agent complexity vs. accuracy trade-offs
- Monitor LLM API costs with multiple agents
Debugging and Monitoring
Sub-Agent Response Analysis
def analyze_subagent_responses(responses):
"""Analyze sub-agent response patterns"""
analysis = {
'response_count': len(responses),
'confidence_stats': {},
'selection_distribution': {},
'reasoning_themes': []
}
# Confidence statistics
confidences = [r.get('confidence', 0) for r in responses]
analysis['confidence_stats'] = {
'mean': sum(confidences) / len(confidences) if confidences else 0,
'min': min(confidences) if confidences else 0,
'max': max(confidences) if confidences else 0,
'std': np.std(confidences) if confidences else 0
}
# Selection distribution
selections = [r.get('selection') for r in responses if r.get('selection')]
for selection in selections:
analysis['selection_distribution'][selection] = analysis['selection_distribution'].get(selection, 0) + 1
# Reasoning analysis (simplified)
rationales = [r.get('rationale', '') for r in responses]
common_terms = {}
for rationale in rationales:
words = rationale.lower().split()
for word in words:
if len(word) > 3: # Filter short words
common_terms[word] = common_terms.get(word, 0) + 1
analysis['reasoning_themes'] = sorted(common_terms.items(), key=lambda x: x[1], reverse=True)[:10]
return analysisNext Steps
- Explore MCP Integration for external tool integration
- Learn about Caching and Performance optimization
- Check out Troubleshooting for common issues and solutions