| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- """地图服务API路由"""
- from fastapi import APIRouter, HTTPException, Query
- from typing import Optional
- from ...models.schemas import (
- POISearchRequest,
- POISearchResponse,
- RouteRequest,
- RouteResponse,
- WeatherResponse
- )
- from ...services.amap_service import get_amap_service
- router = APIRouter(prefix="/map", tags=["地图服务"])
- @router.get(
- "/poi",
- response_model=POISearchResponse,
- summary="搜索POI",
- description="根据关键词搜索POI(兴趣点)"
- )
- async def search_poi(
- keywords: str = Query(..., description="搜索关键词", example="故宫"),
- city: str = Query(..., description="城市", example="北京"),
- citylimit: bool = Query(True, description="是否限制在城市范围内")
- ):
- """
- 搜索POI
-
- Args:
- keywords: 搜索关键词
- city: 城市
- citylimit: 是否限制在城市范围内
-
- Returns:
- POI搜索结果
- """
- try:
- # 获取服务实例
- service = get_amap_service()
-
- # 搜索POI
- pois = service.search_poi(keywords, city, citylimit)
-
- return POISearchResponse(
- success=True,
- message="POI搜索成功",
- data=pois
- )
-
- except Exception as e:
- print(f"❌ POI搜索失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"POI搜索失败: {str(e)}"
- )
- @router.get(
- "/weather",
- response_model=WeatherResponse,
- summary="查询天气",
- description="查询指定城市的天气信息"
- )
- async def get_weather(
- city: str = Query(..., description="城市名称", example="北京")
- ):
- """
- 查询天气
-
- Args:
- city: 城市名称
-
- Returns:
- 天气信息
- """
- try:
- # 获取服务实例
- service = get_amap_service()
-
- # 查询天气
- weather_info = service.get_weather(city)
-
- return WeatherResponse(
- success=True,
- message="天气查询成功",
- data=weather_info
- )
-
- except Exception as e:
- print(f"❌ 天气查询失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"天气查询失败: {str(e)}"
- )
- @router.post(
- "/route",
- response_model=RouteResponse,
- summary="规划路线",
- description="规划两点之间的路线"
- )
- async def plan_route(request: RouteRequest):
- """
- 规划路线
-
- Args:
- request: 路线规划请求
-
- Returns:
- 路线信息
- """
- try:
- # 获取服务实例
- service = get_amap_service()
-
- # 规划路线
- route_info = service.plan_route(
- origin_address=request.origin_address,
- destination_address=request.destination_address,
- origin_city=request.origin_city,
- destination_city=request.destination_city,
- route_type=request.route_type
- )
-
- return RouteResponse(
- success=True,
- message="路线规划成功",
- data=route_info
- )
-
- except Exception as e:
- print(f"❌ 路线规划失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"路线规划失败: {str(e)}"
- )
- @router.get(
- "/health",
- summary="健康检查",
- description="检查地图服务是否正常"
- )
- async def health_check():
- """健康检查"""
- try:
- # 检查服务是否可用
- service = get_amap_service()
-
- return {
- "status": "healthy",
- "service": "map-service",
- "mcp_tools_count": len(service.mcp_tool._available_tools)
- }
- except Exception as e:
- raise HTTPException(
- status_code=503,
- detail=f"服务不可用: {str(e)}"
- )
|