MCP Integration
Model Context Protocol integration for external tools and services
MCP Integration
MCP (Model Context Protocol) enables MRRA agents to safely access external capabilities and real-time data sources. MRRA provides three categories of MCP tools for enhanced mobility prediction.
Available MCP Tools
MRRA includes several built-in MCP tools and supports custom integrations:
Built-in Tools
Purpose: Weather context for mobility decisions
Features:
- Current weather conditions
- Weather impact on mobility patterns
- Indoor/outdoor activity suggestions
Configuration:
"mcp": {
"weather": {} # Enable weather tool (offline stub)
}Usage Example:
# Weather-aware agent configuration
{
"name": "weather_expert",
"prompt": "Consider weather conditions when selecting locations. Prioritize indoor locations during bad weather.",
"mcp": {
"weather": {}
}
}Purpose: Nearby POI discovery and analysis
Features:
- Point of Interest (POI) lookup
- Business hours and availability
- Category-based filtering
Configuration:
"mcp": {
"maps": {} # Enable POI tool (offline stub)
}Usage Example:
# POI-aware agent configuration
{
"name": "poi_expert",
"prompt": "Use nearby POI information to select appropriate locations based on business hours and categories.",
"mcp": {
"maps": {}
}
}Purpose: Remote MCP map services
Features:
- Real-time map data
- Multiple transport protocols
- Custom map service integration
Configuration:
"mcp": {
"gmap": {
"url": "https://your-mcp-server/sse",
"transport": "sse" # sse / websocket / streamable_http / stdio
}
}Supported Transports:
sse: Server-Sent Eventswebsocket: WebSocket connectionstreamable_http: HTTP streamingstdio: Standard input/output
Basic MCP Integration
Single Tool Integration
from mrra.agents.builder import build_mrra_agent
# Simple weather-aware agent
reflection_cfg = dict(
max_round=1,
subAgents=[
{
"name": "weather_aware",
"prompt": """Consider weather conditions in location selection:
- Indoor locations during bad weather
- Outdoor activities during good weather
- Weather-appropriate timing adjustments""",
"mcp": {
"weather": {}
}
},
{
"name": "baseline",
"prompt": "Select based on historical patterns."
}
],
aggregator="confidence_weighted_voting"
)
agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)Multiple Tool Integration
# Multi-tool agent configuration
advanced_mcp_cfg = dict(
max_round=1,
subAgents=[
{
"name": "context_expert",
"prompt": """Use all available external context:
- Weather conditions for indoor/outdoor decisions
- POI availability and business hours
- Real-time map data for accessibility
Integrate all contexts for optimal location selection.""",
"mcp": {
"weather": {},
"maps": {},
"gmap": {
"url": "https://your-map-service.com/mcp",
"transport": "sse"
}
}
},
{
"name": "historical_expert",
"prompt": "Focus on historical patterns and user preferences."
}
],
aggregator="confidence_weighted_voting"
)Advanced MCP Configuration
Custom MCP Server Integration
# Connect to custom MCP services
custom_mcp_cfg = dict(
max_round=1,
subAgents=[
{
"name": "traffic_aware",
"prompt": "Consider real-time traffic conditions in route selection.",
"mcp": {
"traffic": {
"url": "wss://traffic-service.example.com/mcp",
"transport": "websocket",
"headers": {
"Authorization": "Bearer YOUR_TOKEN"
},
"timeout": 30
}
}
},
{
"name": "events_aware",
"prompt": "Consider local events and their impact on mobility.",
"mcp": {
"events": {
"url": "https://events-api.example.com/mcp/stream",
"transport": "streamable_http"
}
}
}
]
)Environment-Based Configuration
import os
# Environment-driven MCP configuration
def create_mcp_config():
config = {}
# Weather service
if os.getenv('WEATHER_MCP_URL'):
config['weather'] = {
'url': os.getenv('WEATHER_MCP_URL'),
'transport': os.getenv('WEATHER_TRANSPORT', 'sse'),
'api_key': os.getenv('WEATHER_API_KEY')
}
else:
config['weather'] = {} # Use stub
# Maps service
if os.getenv('MAPS_MCP_URL'):
config['maps'] = {
'url': os.getenv('MAPS_MCP_URL'),
'transport': os.getenv('MAPS_TRANSPORT', 'sse')
}
else:
config['maps'] = {} # Use stub
return config
# Use in agent configuration
env_mcp_cfg = dict(
max_round=1,
subAgents=[
{
"name": "adaptive_expert",
"prompt": "Adapt to available external services and contexts.",
"mcp": create_mcp_config()
}
]
)Custom MCP Tool Development
Creating Custom Tools
# Custom MCP tool for public transport
def create_transport_tool():
"""Create public transport MCP tool"""
def get_transport_options(location, destination, time):
# Implementation for transport API
return {
'routes': [
{
'mode': 'subway',
'duration': 25,
'cost': 3.50,
'transfers': 1
},
{
'mode': 'bus',
'duration': 35,
'cost': 2.50,
'transfers': 0
}
],
'next_departure': '12:45',
'service_status': 'normal'
}
def get_station_info(location):
# Get nearby stations
return {
'nearest_stations': [
{'name': 'Metro Station A', 'distance': 200, 'lines': ['Line 1', 'Line 2']},
{'name': 'Bus Stop B', 'distance': 150, 'routes': ['Route 10', 'Route 15']}
]
}
return {
'transport_routes': get_transport_options,
'transport_stations': get_station_info
}
# Register custom tool
transport_cfg = dict(
max_round=1,
subAgents=[
{
"name": "transport_expert",
"prompt": """Use public transport information for location recommendations:
- Consider transport accessibility
- Factor in route efficiency
- Account for service disruptions""",
"mcp": {
"transport": create_transport_tool()
}
}
]
)MCP Tool Factory Pattern
class MCPToolFactory:
"""Factory for creating and managing MCP tools"""
def __init__(self):
self.tools = {}
self.connections = {}
def register_tool(self, name, tool_config):
"""Register a new MCP tool"""
self.tools[name] = tool_config
def create_connection(self, tool_name, config):
"""Create MCP connection for a tool"""
if config.get('url'):
# Remote MCP connection
transport = config.get('transport', 'sse')
connection = self._create_remote_connection(config['url'], transport, config)
else:
# Local tool stub
connection = self._create_local_stub(tool_name)
self.connections[tool_name] = connection
return connection
def _create_remote_connection(self, url, transport, config):
"""Create remote MCP connection"""
if transport == 'sse':
return SSEMCPConnection(url, config)
elif transport == 'websocket':
return WebSocketMCPConnection(url, config)
elif transport == 'streamable_http':
return HTTPMCPConnection(url, config)
else:
raise ValueError(f"Unsupported transport: {transport}")
def _create_local_stub(self, tool_name):
"""Create local tool stub"""
if tool_name == 'weather':
return WeatherStub()
elif tool_name == 'maps':
return MapsStub()
else:
return GenericStub(tool_name)
def get_agent_tools(self, mcp_config):
"""Get tools for agent based on configuration"""
agent_tools = {}
for tool_name, tool_config in mcp_config.items():
if tool_name not in self.connections:
self.create_connection(tool_name, tool_config)
agent_tools[tool_name] = self.connections[tool_name]
return agent_tools
# Usage
tool_factory = MCPToolFactory()
# Register custom tools
tool_factory.register_tool('traffic', {
'description': 'Real-time traffic information',
'methods': ['get_traffic', 'get_incidents']
})
# Use in agent configuration
factory_cfg = dict(
max_round=1,
subAgents=[
{
"name": "comprehensive_expert",
"prompt": "Use all available tools for comprehensive analysis.",
"mcp": {
"weather": {"url": "https://weather.example.com/mcp"},
"traffic": {"url": "https://traffic.example.com/mcp"},
"events": {} # Local stub
}
}
]
)MCP Error Handling
Robust Error Handling
class ResilientMCPAgent:
"""Agent with robust MCP error handling"""
def __init__(self, agent_config, mcp_tools):
self.agent_config = agent_config
self.mcp_tools = mcp_tools
self.fallback_enabled = True
def invoke_with_mcp(self, query):
"""Invoke agent with MCP tools and fallback handling"""
# Attempt to use MCP tools
mcp_context = {}
for tool_name, tool in self.mcp_tools.items():
try:
result = self._call_mcp_tool(tool, query, timeout=10)
mcp_context[tool_name] = result
except MCPTimeoutError:
print(f"MCP tool {tool_name} timed out, continuing without it")
mcp_context[tool_name] = None
except MCPConnectionError:
print(f"MCP tool {tool_name} connection failed, using fallback")
mcp_context[tool_name] = self._get_fallback_data(tool_name, query)
except Exception as e:
print(f"Unexpected error with MCP tool {tool_name}: {e}")
mcp_context[tool_name] = None
# Enhance query with MCP context
enhanced_query = {**query, 'mcp_context': mcp_context}
# Execute agent with enhanced context
return self.base_agent.invoke(enhanced_query)
def _call_mcp_tool(self, tool, query, timeout=10):
"""Call MCP tool with timeout"""
import signal
def timeout_handler(signum, frame):
raise MCPTimeoutError("MCP tool call timed out")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
result = tool.call(query)
signal.alarm(0) # Cancel timeout
return result
except Exception as e:
signal.alarm(0) # Cancel timeout
raise e
def _get_fallback_data(self, tool_name, query):
"""Provide fallback data when MCP tools fail"""
fallbacks = {
'weather': {'condition': 'unknown', 'temperature': None},
'maps': {'pois': [], 'status': 'unavailable'},
'traffic': {'condition': 'unknown', 'delay': 0}
}
return fallbacks.get(tool_name, {'status': 'unavailable'})Performance and Optimization
MCP Connection Pooling
class MCPConnectionPool:
"""Connection pool for MCP services"""
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.pools = {}
def get_connection(self, service_url, transport='sse'):
"""Get or create connection from pool"""
pool_key = f"{service_url}_{transport}"
if pool_key not in self.pools:
self.pools[pool_key] = []
pool = self.pools[pool_key]
# Reuse existing connection if available
for conn in pool:
if conn.is_available():
return conn
# Create new connection if pool not full
if len(pool) < self.max_connections:
conn = self._create_connection(service_url, transport)
pool.append(conn)
return conn
# Wait for available connection
return self._wait_for_connection(pool)
def _create_connection(self, url, transport):
"""Create new MCP connection"""
# Implementation depends on transport type
pass
def _wait_for_connection(self, pool):
"""Wait for available connection in pool"""
import time
for _ in range(50): # Max 5 seconds wait
for conn in pool:
if conn.is_available():
return conn
time.sleep(0.1)
raise MCPConnectionError("No available connections in pool")
# Use connection pool
connection_pool = MCPConnectionPool(max_connections=5)
# In agent configuration
pooled_mcp_cfg = dict(
max_round=1,
subAgents=[
{
"name": "pooled_expert",
"mcp": {
"weather": {
"url": "https://weather.example.com/mcp",
"connection_pool": connection_pool
}
}
}
]
)Dependencies: For full MCP functionality, install: pip install langchain-mcp-adapters mcp anyio httpx-sse
The system gracefully falls back to local stubs when MCP services are unavailable.
Debugging MCP Integration
MCP Tool Testing
def test_mcp_tools(mcp_config):
"""Test MCP tool connectivity and functionality"""
results = {}
for tool_name, config in mcp_config.items():
print(f"Testing MCP tool: {tool_name}")
try:
if config.get('url'):
# Test remote connection
result = test_remote_mcp_tool(config['url'], config.get('transport', 'sse'))
else:
# Test local stub
result = test_local_mcp_tool(tool_name)
results[tool_name] = {
'status': 'success',
'latency': result.get('latency', 0),
'capabilities': result.get('capabilities', [])
}
print(f" ✅ {tool_name}: OK (latency: {result.get('latency', 0)}ms)")
except Exception as e:
results[tool_name] = {
'status': 'error',
'error': str(e)
}
print(f" ❌ {tool_name}: ERROR - {e}")
return results
# Test configuration
test_results = test_mcp_tools({
'weather': {'url': 'https://weather.example.com/mcp'},
'maps': {}, # Local stub
'traffic': {'url': 'wss://traffic.example.com/mcp', 'transport': 'websocket'}
})
print("MCP Test Results:", test_results)Production Considerations:
- Implement proper authentication for remote MCP services
- Set appropriate timeouts to prevent hanging
- Use connection pooling for high-throughput scenarios
- Monitor MCP service health and implement fallback strategies
Next Steps
- Learn about Caching and Performance for optimization strategies
- Explore Troubleshooting for common MCP issues
- Check out Configuration for advanced setup options