MRRA LogoMRRA
Advanced Features

MCP Integration

Model Context Protocol integration for external tools and services

MCP Integration

MCP (Model Context Protocol) enables MRRA agents to safely access external capabilities and real-time data sources. MRRA provides three categories of MCP tools for enhanced mobility prediction.

Available MCP Tools

MRRA includes several built-in MCP tools and supports custom integrations:

Built-in Tools

Purpose: Weather context for mobility decisions

Features:

  • Current weather conditions
  • Weather impact on mobility patterns
  • Indoor/outdoor activity suggestions

Configuration:

"mcp": {
    "weather": {}  # Enable weather tool (offline stub)
}

Usage Example:

# Weather-aware agent configuration
{
    "name": "weather_expert",
    "prompt": "Consider weather conditions when selecting locations. Prioritize indoor locations during bad weather.",
    "mcp": {
        "weather": {}
    }
}

Purpose: Nearby POI discovery and analysis

Features:

  • Point of Interest (POI) lookup
  • Business hours and availability
  • Category-based filtering

Configuration:

"mcp": {
    "maps": {}  # Enable POI tool (offline stub)
}

Usage Example:

# POI-aware agent configuration
{
    "name": "poi_expert", 
    "prompt": "Use nearby POI information to select appropriate locations based on business hours and categories.",
    "mcp": {
        "maps": {}
    }
}

Purpose: Remote MCP map services

Features:

  • Real-time map data
  • Multiple transport protocols
  • Custom map service integration

Configuration:

"mcp": {
    "gmap": {
        "url": "https://your-mcp-server/sse",
        "transport": "sse"  # sse / websocket / streamable_http / stdio
    }
}

Supported Transports:

  • sse: Server-Sent Events
  • websocket: WebSocket connection
  • streamable_http: HTTP streaming
  • stdio: Standard input/output

Basic MCP Integration

Single Tool Integration

from mrra.agents.builder import build_mrra_agent

# Simple weather-aware agent
reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "weather_aware",
            "prompt": """Consider weather conditions in location selection:
            - Indoor locations during bad weather
            - Outdoor activities during good weather  
            - Weather-appropriate timing adjustments""",
            "mcp": {
                "weather": {}
            }
        },
        {
            "name": "baseline",
            "prompt": "Select based on historical patterns."
        }
    ],
    aggregator="confidence_weighted_voting"
)

agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)

Multiple Tool Integration

# Multi-tool agent configuration
advanced_mcp_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "context_expert",
            "prompt": """Use all available external context:
            - Weather conditions for indoor/outdoor decisions
            - POI availability and business hours
            - Real-time map data for accessibility
            Integrate all contexts for optimal location selection.""",
            "mcp": {
                "weather": {},
                "maps": {},
                "gmap": {
                    "url": "https://your-map-service.com/mcp",
                    "transport": "sse"
                }
            }
        },
        {
            "name": "historical_expert", 
            "prompt": "Focus on historical patterns and user preferences."
        }
    ],
    aggregator="confidence_weighted_voting"
)

Advanced MCP Configuration

Custom MCP Server Integration

# Connect to custom MCP services
custom_mcp_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "traffic_aware",
            "prompt": "Consider real-time traffic conditions in route selection.",
            "mcp": {
                "traffic": {
                    "url": "wss://traffic-service.example.com/mcp",
                    "transport": "websocket",
                    "headers": {
                        "Authorization": "Bearer YOUR_TOKEN"
                    },
                    "timeout": 30
                }
            }
        },
        {
            "name": "events_aware",
            "prompt": "Consider local events and their impact on mobility.",
            "mcp": {
                "events": {
                    "url": "https://events-api.example.com/mcp/stream",
                    "transport": "streamable_http"
                }
            }
        }
    ]
)

Environment-Based Configuration

import os

# Environment-driven MCP configuration
def create_mcp_config():
    config = {}
    
    # Weather service
    if os.getenv('WEATHER_MCP_URL'):
        config['weather'] = {
            'url': os.getenv('WEATHER_MCP_URL'),
            'transport': os.getenv('WEATHER_TRANSPORT', 'sse'),
            'api_key': os.getenv('WEATHER_API_KEY')
        }
    else:
        config['weather'] = {}  # Use stub
    
    # Maps service
    if os.getenv('MAPS_MCP_URL'):
        config['maps'] = {
            'url': os.getenv('MAPS_MCP_URL'),
            'transport': os.getenv('MAPS_TRANSPORT', 'sse')
        }
    else:
        config['maps'] = {}  # Use stub
    
    return config

# Use in agent configuration
env_mcp_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "adaptive_expert",
            "prompt": "Adapt to available external services and contexts.",
            "mcp": create_mcp_config()
        }
    ]
)

Custom MCP Tool Development

Creating Custom Tools

# Custom MCP tool for public transport
def create_transport_tool():
    """Create public transport MCP tool"""
    
    def get_transport_options(location, destination, time):
        # Implementation for transport API
        return {
            'routes': [
                {
                    'mode': 'subway',
                    'duration': 25,
                    'cost': 3.50,
                    'transfers': 1
                },
                {
                    'mode': 'bus', 
                    'duration': 35,
                    'cost': 2.50,
                    'transfers': 0
                }
            ],
            'next_departure': '12:45',
            'service_status': 'normal'
        }
    
    def get_station_info(location):
        # Get nearby stations
        return {
            'nearest_stations': [
                {'name': 'Metro Station A', 'distance': 200, 'lines': ['Line 1', 'Line 2']},
                {'name': 'Bus Stop B', 'distance': 150, 'routes': ['Route 10', 'Route 15']}
            ]
        }
    
    return {
        'transport_routes': get_transport_options,
        'transport_stations': get_station_info
    }

# Register custom tool
transport_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "transport_expert",
            "prompt": """Use public transport information for location recommendations:
            - Consider transport accessibility
            - Factor in route efficiency
            - Account for service disruptions""",
            "mcp": {
                "transport": create_transport_tool()
            }
        }
    ]
)

MCP Tool Factory Pattern

class MCPToolFactory:
    """Factory for creating and managing MCP tools"""
    
    def __init__(self):
        self.tools = {}
        self.connections = {}
    
    def register_tool(self, name, tool_config):
        """Register a new MCP tool"""
        self.tools[name] = tool_config
    
    def create_connection(self, tool_name, config):
        """Create MCP connection for a tool"""
        if config.get('url'):
            # Remote MCP connection
            transport = config.get('transport', 'sse')
            connection = self._create_remote_connection(config['url'], transport, config)
        else:
            # Local tool stub
            connection = self._create_local_stub(tool_name)
        
        self.connections[tool_name] = connection
        return connection
    
    def _create_remote_connection(self, url, transport, config):
        """Create remote MCP connection"""
        if transport == 'sse':
            return SSEMCPConnection(url, config)
        elif transport == 'websocket':
            return WebSocketMCPConnection(url, config)
        elif transport == 'streamable_http':
            return HTTPMCPConnection(url, config)
        else:
            raise ValueError(f"Unsupported transport: {transport}")
    
    def _create_local_stub(self, tool_name):
        """Create local tool stub"""
        if tool_name == 'weather':
            return WeatherStub()
        elif tool_name == 'maps':
            return MapsStub()
        else:
            return GenericStub(tool_name)
    
    def get_agent_tools(self, mcp_config):
        """Get tools for agent based on configuration"""
        agent_tools = {}
        
        for tool_name, tool_config in mcp_config.items():
            if tool_name not in self.connections:
                self.create_connection(tool_name, tool_config)
            
            agent_tools[tool_name] = self.connections[tool_name]
        
        return agent_tools

# Usage
tool_factory = MCPToolFactory()

# Register custom tools
tool_factory.register_tool('traffic', {
    'description': 'Real-time traffic information',
    'methods': ['get_traffic', 'get_incidents']
})

# Use in agent configuration
factory_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "comprehensive_expert",
            "prompt": "Use all available tools for comprehensive analysis.",
            "mcp": {
                "weather": {"url": "https://weather.example.com/mcp"},
                "traffic": {"url": "https://traffic.example.com/mcp"},
                "events": {}  # Local stub
            }
        }
    ]
)

MCP Error Handling

Robust Error Handling

class ResilientMCPAgent:
    """Agent with robust MCP error handling"""
    
    def __init__(self, agent_config, mcp_tools):
        self.agent_config = agent_config
        self.mcp_tools = mcp_tools
        self.fallback_enabled = True
    
    def invoke_with_mcp(self, query):
        """Invoke agent with MCP tools and fallback handling"""
        
        # Attempt to use MCP tools
        mcp_context = {}
        
        for tool_name, tool in self.mcp_tools.items():
            try:
                result = self._call_mcp_tool(tool, query, timeout=10)
                mcp_context[tool_name] = result
            except MCPTimeoutError:
                print(f"MCP tool {tool_name} timed out, continuing without it")
                mcp_context[tool_name] = None
            except MCPConnectionError:
                print(f"MCP tool {tool_name} connection failed, using fallback")
                mcp_context[tool_name] = self._get_fallback_data(tool_name, query)
            except Exception as e:
                print(f"Unexpected error with MCP tool {tool_name}: {e}")
                mcp_context[tool_name] = None
        
        # Enhance query with MCP context
        enhanced_query = {**query, 'mcp_context': mcp_context}
        
        # Execute agent with enhanced context
        return self.base_agent.invoke(enhanced_query)
    
    def _call_mcp_tool(self, tool, query, timeout=10):
        """Call MCP tool with timeout"""
        import signal
        
        def timeout_handler(signum, frame):
            raise MCPTimeoutError("MCP tool call timed out")
        
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(timeout)
        
        try:
            result = tool.call(query)
            signal.alarm(0)  # Cancel timeout
            return result
        except Exception as e:
            signal.alarm(0)  # Cancel timeout
            raise e
    
    def _get_fallback_data(self, tool_name, query):
        """Provide fallback data when MCP tools fail"""
        fallbacks = {
            'weather': {'condition': 'unknown', 'temperature': None},
            'maps': {'pois': [], 'status': 'unavailable'},
            'traffic': {'condition': 'unknown', 'delay': 0}
        }
        return fallbacks.get(tool_name, {'status': 'unavailable'})

Performance and Optimization

MCP Connection Pooling

class MCPConnectionPool:
    """Connection pool for MCP services"""
    
    def __init__(self, max_connections=10):
        self.max_connections = max_connections
        self.pools = {}
    
    def get_connection(self, service_url, transport='sse'):
        """Get or create connection from pool"""
        pool_key = f"{service_url}_{transport}"
        
        if pool_key not in self.pools:
            self.pools[pool_key] = []
        
        pool = self.pools[pool_key]
        
        # Reuse existing connection if available
        for conn in pool:
            if conn.is_available():
                return conn
        
        # Create new connection if pool not full
        if len(pool) < self.max_connections:
            conn = self._create_connection(service_url, transport)
            pool.append(conn)
            return conn
        
        # Wait for available connection
        return self._wait_for_connection(pool)
    
    def _create_connection(self, url, transport):
        """Create new MCP connection"""
        # Implementation depends on transport type
        pass
    
    def _wait_for_connection(self, pool):
        """Wait for available connection in pool"""
        import time
        
        for _ in range(50):  # Max 5 seconds wait
            for conn in pool:
                if conn.is_available():
                    return conn
            time.sleep(0.1)
        
        raise MCPConnectionError("No available connections in pool")

# Use connection pool
connection_pool = MCPConnectionPool(max_connections=5)

# In agent configuration
pooled_mcp_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "pooled_expert",
            "mcp": {
                "weather": {
                    "url": "https://weather.example.com/mcp",
                    "connection_pool": connection_pool
                }
            }
        }
    ]
)

Dependencies: For full MCP functionality, install: pip install langchain-mcp-adapters mcp anyio httpx-sse

The system gracefully falls back to local stubs when MCP services are unavailable.

Debugging MCP Integration

MCP Tool Testing

def test_mcp_tools(mcp_config):
    """Test MCP tool connectivity and functionality"""
    
    results = {}
    
    for tool_name, config in mcp_config.items():
        print(f"Testing MCP tool: {tool_name}")
        
        try:
            if config.get('url'):
                # Test remote connection
                result = test_remote_mcp_tool(config['url'], config.get('transport', 'sse'))
            else:
                # Test local stub
                result = test_local_mcp_tool(tool_name)
            
            results[tool_name] = {
                'status': 'success',
                'latency': result.get('latency', 0),
                'capabilities': result.get('capabilities', [])
            }
            print(f"  ✅ {tool_name}: OK (latency: {result.get('latency', 0)}ms)")
            
        except Exception as e:
            results[tool_name] = {
                'status': 'error',
                'error': str(e)
            }
            print(f"  ❌ {tool_name}: ERROR - {e}")
    
    return results

# Test configuration
test_results = test_mcp_tools({
    'weather': {'url': 'https://weather.example.com/mcp'},
    'maps': {},  # Local stub
    'traffic': {'url': 'wss://traffic.example.com/mcp', 'transport': 'websocket'}
})

print("MCP Test Results:", test_results)

Production Considerations:

  • Implement proper authentication for remote MCP services
  • Set appropriate timeouts to prevent hanging
  • Use connection pooling for high-throughput scenarios
  • Monitor MCP service health and implement fallback strategies

Next Steps