workflow.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. """
  2. 工作流API路由 - 协调多个智能体完成复杂任务
  3. """
  4. from fastapi import APIRouter, HTTPException
  5. from typing import Dict, Any, Optional, List
  6. from pydantic import BaseModel
  7. import logging
  8. import asyncio
  9. from agents.controller import agent_controller
  10. logger = logging.getLogger(__name__)
  11. router = APIRouter()
  12. # Pydantic模型
  13. class WorkflowRequest(BaseModel):
  14. keywords: str
  15. analysis_type: str = "summary" # summary, innovation, comparison, comprehensive
  16. citation_format: str = "bibtex" # bibtex, apa, ieee, mla
  17. writing_task: Optional[str] = None # improve, polish, translate
  18. limit: int = 5 # 搜索论文数量
  19. class WorkflowStatus(BaseModel):
  20. workflow_id: str
  21. status: str # running, completed, failed
  22. current_step: str
  23. progress: int # 0-100
  24. @router.post("/complete", response_model=Dict[str, Any])
  25. async def complete_workflow(request: WorkflowRequest):
  26. """
  27. 完整工作流:搜索 -> 分析 -> 校验引用 -> 写作辅助
  28. 自动协调所有智能体完成任务
  29. """
  30. try:
  31. workflow_id = f"workflow_{asyncio.get_event_loop().time()}"
  32. results = {
  33. "workflow_id": workflow_id,
  34. "status": "running",
  35. "steps": []
  36. }
  37. # 步骤 1: Hunter - 搜索论文
  38. logger.info(f"[工作流 {workflow_id}] 步骤 1/4: 搜索论文")
  39. try:
  40. from api.routes.papers import search_papers, PaperSearchRequest
  41. search_result = await search_papers(PaperSearchRequest(
  42. keywords=request.keywords,
  43. source="arxiv",
  44. limit=request.limit
  45. ))
  46. papers = search_result.get("papers", [])
  47. results["steps"].append({
  48. "step": 1,
  49. "name": "Hunter - 论文搜索",
  50. "status": "completed",
  51. "result": {
  52. "total_found": len(papers),
  53. "papers": papers
  54. }
  55. })
  56. if not papers:
  57. raise HTTPException(status_code=404, detail="未找到相关论文")
  58. except Exception as e:
  59. logger.error(f"论文搜索失败: {str(e)}")
  60. results["steps"].append({
  61. "step": 1,
  62. "name": "Hunter - 论文搜索",
  63. "status": "failed",
  64. "error": str(e)
  65. })
  66. results["status"] = "failed"
  67. return results
  68. # 步骤 2: Miner - 分析每篇论文
  69. logger.info(f"[工作流 {workflow_id}] 步骤 2/4: 分析论文")
  70. analyses = []
  71. try:
  72. from api.routes.analysis import analyze_paper, PaperAnalysisRequest
  73. # 分析前3篇论文
  74. for i, paper in enumerate(papers[:3]):
  75. try:
  76. analysis_result = await analyze_paper(PaperAnalysisRequest(
  77. paper_url=paper["url"],
  78. analysis_type=request.analysis_type
  79. ))
  80. analyses.append({
  81. "paper_id": paper["id"],
  82. "title": paper["title"],
  83. "analysis": analysis_result.get("analysis", "")
  84. })
  85. except Exception as e:
  86. logger.warning(f"分析论文 {paper['id']} 失败: {str(e)}")
  87. continue
  88. results["steps"].append({
  89. "step": 2,
  90. "name": "Miner - 论文分析",
  91. "status": "completed",
  92. "result": {
  93. "total_analyzed": len(analyses),
  94. "analyses": analyses
  95. }
  96. })
  97. except Exception as e:
  98. logger.error(f"论文分析失败: {str(e)}")
  99. results["steps"].append({
  100. "step": 2,
  101. "name": "Miner - 论文分析",
  102. "status": "failed",
  103. "error": str(e)
  104. })
  105. # 步骤 3: Validator - 生成和校验引用
  106. logger.info(f"[工作流 {workflow_id}] 步骤 3/4: 生成引用")
  107. citations = []
  108. try:
  109. from api.routes.citations import validate_citation, CitationValidationRequest
  110. # 为每篇论文生成引用
  111. for paper in papers[:3]:
  112. try:
  113. # 构建引用文本
  114. authors_str = ", ".join(paper["authors"][:3])
  115. if len(paper["authors"]) > 3:
  116. authors_str += " et al."
  117. citation_text = f"{authors_str} ({paper['published_date'][:4]}). {paper['title']}. arXiv:{paper['id']}"
  118. citation_result = await validate_citation(CitationValidationRequest(
  119. citation=citation_text,
  120. format=request.citation_format
  121. ))
  122. citations.append({
  123. "paper_id": paper["id"],
  124. "title": paper["title"],
  125. "formatted_citation": citation_result.get("formatted_citation", "")
  126. })
  127. except Exception as e:
  128. logger.warning(f"生成引用失败: {str(e)}")
  129. continue
  130. results["steps"].append({
  131. "step": 3,
  132. "name": "Validator - 引用生成",
  133. "status": "completed",
  134. "result": {
  135. "total_citations": len(citations),
  136. "citations": citations
  137. }
  138. })
  139. except Exception as e:
  140. logger.error(f"引用生成失败: {str(e)}")
  141. results["steps"].append({
  142. "step": 3,
  143. "name": "Validator - 引用生成",
  144. "status": "failed",
  145. "error": str(e)
  146. })
  147. # 步骤 4: Coach - 生成综合报告(可选)
  148. if request.writing_task:
  149. logger.info(f"[工作流 {workflow_id}] 步骤 4/4: 生成报告")
  150. try:
  151. from api.routes.writing import writing_coach, WritingCoachRequest
  152. # 构建综合报告文本
  153. report_text = f"# 关于 '{request.keywords}' 的研究综述\n\n"
  154. report_text += f"## 搜索结果\n找到 {len(papers)} 篇相关论文\n\n"
  155. if analyses:
  156. report_text += "## 论文分析\n"
  157. for i, analysis in enumerate(analyses[:3], 1):
  158. report_text += f"\n### {i}. {analysis['title']}\n"
  159. report_text += f"{analysis['analysis'][:500]}...\n"
  160. if citations:
  161. report_text += "\n## 参考文献\n"
  162. for i, citation in enumerate(citations, 1):
  163. report_text += f"{i}. {citation['formatted_citation']}\n"
  164. # 使用 Coach 改进报告
  165. writing_result = await writing_coach(WritingCoachRequest(
  166. text=report_text,
  167. style="academic",
  168. task=request.writing_task
  169. ))
  170. results["steps"].append({
  171. "step": 4,
  172. "name": "Coach - 报告生成",
  173. "status": "completed",
  174. "result": {
  175. "report": writing_result.get("result", "")
  176. }
  177. })
  178. except Exception as e:
  179. logger.error(f"报告生成失败: {str(e)}")
  180. results["steps"].append({
  181. "step": 4,
  182. "name": "Coach - 报告生成",
  183. "status": "failed",
  184. "error": str(e)
  185. })
  186. # 完成工作流
  187. results["status"] = "completed"
  188. results["summary"] = {
  189. "total_papers": len(papers),
  190. "analyzed_papers": len(analyses),
  191. "generated_citations": len(citations),
  192. "keywords": request.keywords
  193. }
  194. logger.info(f"[工作流 {workflow_id}] 完成")
  195. return results
  196. except HTTPException:
  197. raise
  198. except Exception as e:
  199. logger.error(f"工作流执行失败: {str(e)}")
  200. raise HTTPException(status_code=500, detail=f"工作流执行失败: {str(e)}")
  201. @router.post("/search-and-analyze", response_model=Dict[str, Any])
  202. async def search_and_analyze(request: WorkflowRequest):
  203. """
  204. 简化工作流:搜索 + 分析
  205. 只执行搜索和分析步骤
  206. """
  207. try:
  208. results = {
  209. "status": "running",
  210. "steps": []
  211. }
  212. # 步骤 1: 搜索论文
  213. from api.routes.papers import search_papers, PaperSearchRequest
  214. search_result = await search_papers(PaperSearchRequest(
  215. keywords=request.keywords,
  216. source="arxiv",
  217. limit=request.limit
  218. ))
  219. papers = search_result.get("papers", [])
  220. results["steps"].append({
  221. "step": 1,
  222. "name": "搜索论文",
  223. "status": "completed",
  224. "papers": papers
  225. })
  226. if not papers:
  227. raise HTTPException(status_code=404, detail="未找到相关论文")
  228. # 步骤 2: 分析第一篇论文
  229. from api.routes.analysis import analyze_paper, PaperAnalysisRequest
  230. first_paper = papers[0]
  231. analysis_result = await analyze_paper(PaperAnalysisRequest(
  232. paper_url=first_paper["url"],
  233. analysis_type=request.analysis_type
  234. ))
  235. results["steps"].append({
  236. "step": 2,
  237. "name": "分析论文",
  238. "status": "completed",
  239. "analysis": analysis_result
  240. })
  241. results["status"] = "completed"
  242. return results
  243. except HTTPException:
  244. raise
  245. except Exception as e:
  246. logger.error(f"搜索和分析失败: {str(e)}")
  247. raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}")
  248. @router.get("/status/{workflow_id}")
  249. async def get_workflow_status(workflow_id: str):
  250. """获取工作流状态"""
  251. try:
  252. # 这里可以实现工作流状态跟踪
  253. # 暂时返回模拟状态
  254. return {
  255. "workflow_id": workflow_id,
  256. "status": "completed",
  257. "progress": 100,
  258. "message": "工作流已完成"
  259. }
  260. except Exception as e:
  261. logger.error(f"获取工作流状态失败: {str(e)}")
  262. raise HTTPException(status_code=500, detail=str(e))