| 1234567891011121314151617181920212223242526272829303132333435363738394041 |
- #!/usr/bin/env python3
- """测试天气查询 MCP 服务器"""
- import asyncio
- import json
- import os
- from hello_agents.protocols import MCPClient
- async def test_weather_server():
- server_script = os.path.join(os.path.dirname(__file__), "14_weather_mcp_server.py")
- client = MCPClient(["python", server_script])
- try:
- async with client:
- # 测试1: 获取服务器信息
- info = json.loads(await client.call_tool("get_server_info", {}))
- print(f"服务器: {info['name']} v{info['version']}")
- # 测试2: 列出支持的城市
- cities = json.loads(await client.call_tool("list_supported_cities", {}))
- print(f"支持城市: {cities['count']} 个")
- # 测试3: 查询北京天气
- weather = json.loads(await client.call_tool("get_weather", {"city": "北京"}))
- if "error" not in weather:
- print(f"\n北京天气: {weather['temperature']}°C, {weather['condition']}")
- # 测试4: 查询深圳天气
- weather = json.loads(await client.call_tool("get_weather", {"city": "深圳"}))
- if "error" not in weather:
- print(f"深圳天气: {weather['temperature']}°C, {weather['condition']}")
- print("\n✅ 所有测试完成!")
- except Exception as e:
- print(f"❌ 测试失败: {e}")
- if __name__ == "__main__":
- asyncio.run(test_weather_server())
|