""" API客户端模块 用于与外部API交互 """ import requests from typing import Dict, Any, Optional class APIClient: """API客户端基类""" def __init__(self, base_url: str, api_key: Optional[str] = None): """ 初始化API客户端 Args: base_url: API基础URL api_key: API密钥 """ self.base_url = base_url self.api_key = api_key self.session = requests.Session() if api_key: self.session.headers.update({ 'Authorization': f'Bearer {api_key}' }) def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ 发送GET请求 Args: endpoint: API端点 params: 查询参数 Returns: 响应数据 """ # TODO: 添加重试逻辑 url = f"{self.base_url}/{endpoint}" response = self.session.get(url, params=params) response.raise_for_status() return response.json() def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]: """ 发送POST请求 Args: endpoint: API端点 data: 请求数据 Returns: 响应数据 """ # TODO: 添加错误处理 url = f"{self.base_url}/{endpoint}" response = self.session.post(url, json=data) response.raise_for_status() return response.json() def put(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]: """ 发送PUT请求 Args: endpoint: API端点 data: 请求数据 Returns: 响应数据 """ url = f"{self.base_url}/{endpoint}" response = self.session.put(url, json=data) response.raise_for_status() return response.json() def delete(self, endpoint: str) -> None: """ 发送DELETE请求 Args: endpoint: API端点 """ # TODO: 添加确认机制 url = f"{self.base_url}/{endpoint}" response = self.session.delete(url) response.raise_for_status()