MRRA LogoMRRA
高级功能

MCP集成

模型上下文协议集成,用于外部工具和服务

MCP集成

MCP(模型上下文协议)使MRRA智能体能够安全访问外部能力和实时数据源。MRRA提供三个MCP工具类别用于增强移动性预测。

可用的MCP工具

MRRA包含多个内置MCP工具并支持自定义集成:

内置工具

目的:为移动性决策提供天气上下文

功能

  • 当前天气条件
  • 天气对移动模式的影响
  • 室内/室外活动建议

配置

"mcp": {
    "weather": {}  # 启用天气工具(离线存根)
}

使用示例

# 天气感知智能体配置
{
    "name": "weather_expert",
    "prompt": "在选择位置时考虑天气条件。在恶劣天气下优先选择室内位置。",
    "mcp": {
        "weather": {}
    }
}

目的:附近POI发现和分析

功能

  • 兴趣点(POI)查找
  • 营业时间和可用性
  • 基于类别的过滤

配置

"mcp": {
    "maps": {}  # 启用POI工具(离线存根)
}

使用示例

# POI感知智能体配置
{
    "name": "poi_expert", 
    "prompt": "使用附近POI信息来基于营业时间和类别选择合适的位置。",
    "mcp": {
        "maps": {}
    }
}

目的:远程MCP地图服务

功能

  • 实时地图数据
  • 多种传输协议
  • 自定义地图服务集成

配置

"mcp": {
    "gmap": {
        "url": "https://your-mcp-server/sse",
        "transport": "sse"  # sse / websocket / streamable_http / stdio
    }
}

支持的传输协议

  • sse:服务器发送事件
  • websocket:WebSocket连接
  • streamable_http:HTTP流式传输
  • stdio:标准输入/输出

基本MCP集成

单工具集成

from mrra.agents.builder import build_mrra_agent

# 简单的天气感知智能体
reflection_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "weather_aware",
            "prompt": """在位置选择中考虑天气条件:
            - 恶劣天气时选择室内位置
            - 好天气时进行室外活动  
            - 根据天气调整适宜的时间""",
            "mcp": {
                "weather": {}
            }
        },
        {
            "name": "baseline",
            "prompt": "基于历史模式进行选择。"
        }
    ],
    aggregator="confidence_weighted_voting"
)

agent = build_mrra_agent(llm=llm_cfg, retriever=retriever, reflection=reflection_cfg)

多工具集成

# 多工具智能体配置
advanced_mcp_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "context_expert",
            "prompt": """使用所有可用的外部上下文:
            - 天气条件用于室内/室外决策
            - POI可用性和营业时间
            - 实时地图数据用于可达性
            整合所有上下文进行最优位置选择。""",
            "mcp": {
                "weather": {},
                "maps": {},
                "gmap": {
                    "url": "https://your-map-service.com/mcp",
                    "transport": "sse"
                }
            }
        },
        {
            "name": "historical_expert", 
            "prompt": "专注于历史模式和用户偏好。"
        }
    ],
    aggregator="confidence_weighted_voting"
)

高级MCP配置

自定义MCP服务器集成

# 连接到自定义MCP服务
custom_mcp_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "traffic_aware",
            "prompt": "在路线选择中考虑实时交通条件。",
            "mcp": {
                "traffic": {
                    "url": "wss://traffic-service.example.com/mcp",
                    "transport": "websocket",
                    "headers": {
                        "Authorization": "Bearer YOUR_TOKEN"
                    },
                    "timeout": 30
                }
            }
        },
        {
            "name": "events_aware",
            "prompt": "考虑当地活动及其对移动性的影响。",
            "mcp": {
                "events": {
                    "url": "https://events-api.example.com/mcp/stream",
                    "transport": "streamable_http"
                }
            }
        }
    ]
)

自定义MCP工具开发

创建自定义工具

# 公共交通的自定义MCP工具
def create_transport_tool():
    """创建公共交通MCP工具"""
    
    def get_transport_options(location, destination, time):
        # 交通API的实现
        return {
            'routes': [
                {
                    'mode': 'subway',
                    'duration': 25,
                    'cost': 3.50,
                    'transfers': 1
                },
                {
                    'mode': 'bus', 
                    'duration': 35,
                    'cost': 2.50,
                    'transfers': 0
                }
            ],
            'next_departure': '12:45',
            'service_status': 'normal'
        }
    
    def get_station_info(location):
        # 获取附近站点
        return {
            'nearest_stations': [
                {'name': '地铁站A', 'distance': 200, 'lines': ['线路1', '线路2']},
                {'name': '公交站B', 'distance': 150, 'routes': ['路线10', '路线15']}
            ]
        }
    
    return {
        'transport_routes': get_transport_options,
        'transport_stations': get_station_info
    }

# 注册自定义工具
transport_cfg = dict(
    max_round=1,
    subAgents=[
        {
            "name": "transport_expert",
            "prompt": """使用公共交通信息进行位置推荐:
            - 考虑交通可达性
            - 考虑路线效率
            - 考虑服务中断""",
            "mcp": {
                "transport": create_transport_tool()
            }
        }
    ]
)

MCP错误处理

稳健的错误处理

class ResilientMCPAgent:
    """带有稳健MCP错误处理的智能体"""
    
    def __init__(self, agent_config, mcp_tools):
        self.agent_config = agent_config
        self.mcp_tools = mcp_tools
        self.fallback_enabled = True
    
    def invoke_with_mcp(self, query):
        """使用MCP工具和备用处理调用智能体"""
        
        # 尝试使用MCP工具
        mcp_context = {}
        
        for tool_name, tool in self.mcp_tools.items():
            try:
                result = self._call_mcp_tool(tool, query, timeout=10)
                mcp_context[tool_name] = result
            except MCPTimeoutError:
                print(f"MCP工具{tool_name}超时,继续不使用它")
                mcp_context[tool_name] = None
            except MCPConnectionError:
                print(f"MCP工具{tool_name}连接失败,使用备用")
                mcp_context[tool_name] = self._get_fallback_data(tool_name, query)
            except Exception as e:
                print(f"MCP工具{tool_name}出现意外错误: {e}")
                mcp_context[tool_name] = None
        
        # 使用MCP上下文增强查询
        enhanced_query = {**query, 'mcp_context': mcp_context}
        
        # 使用增强上下文执行智能体
        return self.base_agent.invoke(enhanced_query)
    
    def _get_fallback_data(self, tool_name, query):
        """当MCP工具失败时提供备用数据"""
        fallbacks = {
            'weather': {'condition': 'unknown', 'temperature': None},
            'maps': {'pois': [], 'status': 'unavailable'},
            'traffic': {'condition': 'unknown', 'delay': 0}
        }
        return fallbacks.get(tool_name, {'status': 'unavailable'})

依赖项:为了获得完整的MCP功能,安装:pip install langchain-mcp-adapters mcp anyio httpx-sse

当MCP服务不可用时,系统会优雅地回退到本地存根。

调试MCP集成

MCP工具测试

def test_mcp_tools(mcp_config):
    """测试MCP工具连接和功能"""
    
    results = {}
    
    for tool_name, config in mcp_config.items():
        print(f"正在测试MCP工具: {tool_name}")
        
        try:
            if config.get('url'):
                # 测试远程连接
                result = test_remote_mcp_tool(config['url'], config.get('transport', 'sse'))
            else:
                # 测试本地存根
                result = test_local_mcp_tool(tool_name)
            
            results[tool_name] = {
                'status': 'success',
                'latency': result.get('latency', 0),
                'capabilities': result.get('capabilities', [])
            }
            print(f"  ✅ {tool_name}: OK (延迟: {result.get('latency', 0)}ms)")
            
        except Exception as e:
            results[tool_name] = {
                'status': 'error',
                'error': str(e)
            }
            print(f"  ❌ {tool_name}: 错误 - {e}")
    
    return results

# 测试配置
test_results = test_mcp_tools({
    'weather': {'url': 'https://weather.example.com/mcp'},
    'maps': {},  # 本地存根
    'traffic': {'url': 'wss://traffic.example.com/mcp', 'transport': 'websocket'}
})

print("MCP测试结果:", test_results)

生产环境考虑

  • 为远程MCP服务实施适当的身份验证
  • 设置适当的超时时间以防止挂起
  • 为高吞吐量场景使用连接池
  • 监控MCP服务健康状况并实施备用策略

下一步