tasks.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. """
  2. 任务相关API路由
  3. """
  4. from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
  5. from typing import List, Dict, Any, Optional
  6. from pydantic import BaseModel
  7. import logging
  8. import json
  9. import asyncio
  10. # from ...agents.controller import agent_controller, TaskType
  11. # 临时注释,避免相对导入错误
  12. agent_controller = None
  13. TaskType = None
  14. logger = logging.getLogger(__name__)
  15. router = APIRouter()
  16. # Pydantic模型
  17. class TaskSubmitRequest(BaseModel):
  18. task_type: str
  19. input_data: Dict[str, Any]
  20. priority: int = 0
  21. class TaskResponse(BaseModel):
  22. id: str
  23. type: str
  24. status: str
  25. created_at: str
  26. started_at: Optional[str]
  27. completed_at: Optional[str]
  28. priority: int
  29. # WebSocket连接管理
  30. class ConnectionManager:
  31. def __init__(self):
  32. self.active_connections: List[WebSocket] = []
  33. async def connect(self, websocket: WebSocket):
  34. await websocket.accept()
  35. self.active_connections.append(websocket)
  36. def disconnect(self, websocket: WebSocket):
  37. self.active_connections.remove(websocket)
  38. async def send_personal_message(self, message: str, websocket: WebSocket):
  39. await websocket.send_text(message)
  40. async def broadcast(self, message: str):
  41. for connection in self.active_connections:
  42. try:
  43. await connection.send_text(message)
  44. except:
  45. # 连接已断开,移除
  46. self.active_connections.remove(connection)
  47. manager = ConnectionManager()
  48. @router.post("/submit", response_model=Dict[str, Any])
  49. async def submit_task(request: TaskSubmitRequest):
  50. """提交任务"""
  51. try:
  52. # 验证任务类型
  53. try:
  54. task_type = TaskType(request.task_type)
  55. except ValueError:
  56. raise HTTPException(status_code=400, detail=f"不支持的任务类型: {request.task_type}")
  57. # 提交任务
  58. task_id = await agent_controller.submit_task(
  59. task_type=task_type,
  60. input_data=request.input_data,
  61. priority=request.priority
  62. )
  63. return {
  64. "success": True,
  65. "task_id": task_id,
  66. "message": "任务已提交"
  67. }
  68. except HTTPException:
  69. raise
  70. except Exception as e:
  71. logger.error(f"提交任务失败: {str(e)}")
  72. raise HTTPException(status_code=500, detail=str(e))
  73. @router.get("/{task_id}/execute", response_model=Dict[str, Any])
  74. async def execute_task(task_id: str):
  75. """执行任务"""
  76. try:
  77. result = await agent_controller.execute_task(task_id)
  78. return {
  79. "success": True,
  80. "task_id": task_id,
  81. "result": result
  82. }
  83. except Exception as e:
  84. logger.error(f"执行任务失败: {str(e)}")
  85. raise HTTPException(status_code=500, detail=str(e))
  86. @router.get("/{task_id}/status", response_model=TaskResponse)
  87. async def get_task_status(task_id: str):
  88. """获取任务状态"""
  89. try:
  90. status = await agent_controller.get_task_status(task_id)
  91. if not status:
  92. raise HTTPException(status_code=404, detail="任务不存在")
  93. return TaskResponse(**status)
  94. except HTTPException:
  95. raise
  96. except Exception as e:
  97. logger.error(f"获取任务状态失败: {str(e)}")
  98. raise HTTPException(status_code=500, detail=str(e))
  99. @router.delete("/{task_id}", response_model=Dict[str, Any])
  100. async def cancel_task(task_id: str):
  101. """取消任务"""
  102. try:
  103. success = await agent_controller.cancel_task(task_id)
  104. if success:
  105. return {"success": True, "message": "任务已取消"}
  106. else:
  107. return {"success": False, "message": "任务无法取消(可能正在执行或已完成)"}
  108. except Exception as e:
  109. logger.error(f"取消任务失败: {str(e)}")
  110. raise HTTPException(status_code=500, detail=str(e))
  111. @router.get("/", response_model=List[TaskResponse])
  112. async def list_tasks():
  113. """获取任务列表"""
  114. try:
  115. # 获取活跃任务
  116. active_tasks = []
  117. for task_id, task in agent_controller.active_tasks.items():
  118. active_tasks.append(TaskResponse(
  119. id=task["id"],
  120. type=task["type"].value,
  121. status=task["status"].value,
  122. created_at=task["created_at"].isoformat(),
  123. started_at=task["started_at"].isoformat() if task["started_at"] else None,
  124. completed_at=task["completed_at"].isoformat() if task["completed_at"] else None,
  125. priority=task["priority"]
  126. ))
  127. # 获取历史任务(最近50个)
  128. history_tasks = []
  129. for task in agent_controller.task_history[-50:]:
  130. history_tasks.append(TaskResponse(
  131. id=task["id"],
  132. type=task["type"].value,
  133. status=task["status"].value,
  134. created_at=task["created_at"].isoformat(),
  135. started_at=task["started_at"].isoformat() if task["started_at"] else None,
  136. completed_at=task["completed_at"].isoformat() if task["completed_at"] else None,
  137. priority=task["priority"]
  138. ))
  139. return active_tasks + history_tasks
  140. except Exception as e:
  141. logger.error(f"获取任务列表失败: {str(e)}")
  142. raise HTTPException(status_code=500, detail=str(e))
  143. @router.get("/agents/status", response_model=Dict[str, Any])
  144. async def get_agents_status():
  145. """获取智能体状态"""
  146. try:
  147. status = await agent_controller.get_agent_status()
  148. return {
  149. "success": True,
  150. "agents": status
  151. }
  152. except Exception as e:
  153. logger.error(f"获取智能体状态失败: {str(e)}")
  154. raise HTTPException(status_code=500, detail=str(e))
  155. @router.post("/workflow/full", response_model=Dict[str, Any])
  156. async def run_full_workflow(input_data: Dict[str, Any]):
  157. """运行完整工作流"""
  158. try:
  159. # 提交完整工作流任务
  160. task_id = await agent_controller.submit_task(
  161. task_type=TaskType.FULL_WORKFLOW,
  162. input_data=input_data,
  163. priority=1 # 高优先级
  164. )
  165. # 执行任务
  166. result = await agent_controller.execute_task(task_id)
  167. return {
  168. "success": True,
  169. "task_id": task_id,
  170. "result": result
  171. }
  172. except Exception as e:
  173. logger.error(f"运行完整工作流失败: {str(e)}")
  174. raise HTTPException(status_code=500, detail=str(e))
  175. @router.websocket("/ws/{task_id}")
  176. async def websocket_task_updates(websocket: WebSocket, task_id: str):
  177. """WebSocket任务更新"""
  178. await manager.connect(websocket)
  179. try:
  180. # 发送初始状态
  181. status = await agent_controller.get_task_status(task_id)
  182. if status:
  183. await manager.send_personal_message(
  184. json.dumps({"type": "status", "data": status}),
  185. websocket
  186. )
  187. # 监听任务状态变化
  188. while True:
  189. await asyncio.sleep(1) # 每秒检查一次
  190. status = await agent_controller.get_task_status(task_id)
  191. if status:
  192. await manager.send_personal_message(
  193. json.dumps({"type": "status", "data": status}),
  194. websocket
  195. )
  196. # 如果任务完成,断开连接
  197. if status["status"] in ["completed", "failed", "cancelled"]:
  198. break
  199. except WebSocketDisconnect:
  200. manager.disconnect(websocket)
  201. except Exception as e:
  202. logger.error(f"WebSocket连接异常: {str(e)}")
  203. manager.disconnect(websocket)
  204. @router.websocket("/ws/stream")
  205. async def websocket_stream(websocket: WebSocket):
  206. """WebSocket流式通信(用于写作助教等实时交互)"""
  207. await manager.connect(websocket)
  208. try:
  209. while True:
  210. # 接收消息
  211. data = await websocket.receive_text()
  212. message = json.loads(data)
  213. # 处理不同类型的消息
  214. if message.get("type") == "writing_assistance":
  215. # 处理写作辅助请求
  216. await handle_writing_assistance(websocket, message.get("data", {}))
  217. elif message.get("type") == "ping":
  218. # 心跳检测
  219. await manager.send_personal_message(
  220. json.dumps({"type": "pong"}),
  221. websocket
  222. )
  223. except WebSocketDisconnect:
  224. manager.disconnect(websocket)
  225. except Exception as e:
  226. logger.error(f"WebSocket流式通信异常: {str(e)}")
  227. manager.disconnect(websocket)
  228. async def handle_writing_assistance(websocket: WebSocket, data: Dict[str, Any]):
  229. """处理写作辅助请求"""
  230. try:
  231. # 提交写作辅助任务
  232. task_id = await agent_controller.submit_task(
  233. task_type=TaskType.WRITING_ASSISTANCE,
  234. input_data=data
  235. )
  236. # 发送任务ID
  237. await manager.send_personal_message(
  238. json.dumps({"type": "task_started", "task_id": task_id}),
  239. websocket
  240. )
  241. # 执行任务
  242. result = await agent_controller.execute_task(task_id)
  243. # 发送结果
  244. await manager.send_personal_message(
  245. json.dumps({"type": "task_completed", "result": result}),
  246. websocket
  247. )
  248. except Exception as e:
  249. await manager.send_personal_message(
  250. json.dumps({"type": "error", "message": str(e)}),
  251. websocket
  252. )