1
0

14_test_weather_server.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. #!/usr/bin/env python3
  2. """测试天气查询 MCP 服务器"""
  3. import asyncio
  4. import json
  5. import os
  6. from hello_agents.protocols import MCPClient
  7. async def test_weather_server():
  8. server_script = os.path.join(os.path.dirname(__file__), "14_weather_mcp_server.py")
  9. client = MCPClient(["python", server_script])
  10. try:
  11. async with client:
  12. # 测试1: 获取服务器信息
  13. info = json.loads(await client.call_tool("get_server_info", {}))
  14. print(f"服务器: {info['name']} v{info['version']}")
  15. # 测试2: 列出支持的城市
  16. cities = json.loads(await client.call_tool("list_supported_cities", {}))
  17. print(f"支持城市: {cities['count']} 个")
  18. # 测试3: 查询北京天气
  19. weather = json.loads(await client.call_tool("get_weather", {"city": "北京"}))
  20. if "error" not in weather:
  21. print(f"\n北京天气: {weather['temperature']}°C, {weather['condition']}")
  22. # 测试4: 查询深圳天气
  23. weather = json.loads(await client.call_tool("get_weather", {"city": "深圳"}))
  24. if "error" not in weather:
  25. print(f"深圳天气: {weather['temperature']}°C, {weather['condition']}")
  26. print("\n✅ 所有测试完成!")
  27. except Exception as e:
  28. print(f"❌ 测试失败: {e}")
  29. if __name__ == "__main__":
  30. asyncio.run(test_weather_server())