| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- """
- 工作流API路由 - 协调多个智能体完成复杂任务
- """
- from fastapi import APIRouter, HTTPException
- from typing import Dict, Any, Optional, List
- from pydantic import BaseModel
- import logging
- import asyncio
- from agents.controller import agent_controller
- logger = logging.getLogger(__name__)
- router = APIRouter()
- # Pydantic模型
- class WorkflowRequest(BaseModel):
- keywords: str
- analysis_type: str = "summary" # summary, innovation, comparison, comprehensive
- citation_format: str = "bibtex" # bibtex, apa, ieee, mla
- writing_task: Optional[str] = None # improve, polish, translate
- limit: int = 5 # 搜索论文数量
- class WorkflowStatus(BaseModel):
- workflow_id: str
- status: str # running, completed, failed
- current_step: str
- progress: int # 0-100
- @router.post("/complete", response_model=Dict[str, Any])
- async def complete_workflow(request: WorkflowRequest):
- """
- 完整工作流:搜索 -> 分析 -> 校验引用 -> 写作辅助
- 自动协调所有智能体完成任务
- """
- try:
- workflow_id = f"workflow_{asyncio.get_event_loop().time()}"
- results = {
- "workflow_id": workflow_id,
- "status": "running",
- "steps": []
- }
-
- # 步骤 1: Hunter - 搜索论文
- logger.info(f"[工作流 {workflow_id}] 步骤 1/4: 搜索论文")
- try:
- from api.routes.papers import search_papers, PaperSearchRequest
-
- search_result = await search_papers(PaperSearchRequest(
- keywords=request.keywords,
- source="arxiv",
- limit=request.limit
- ))
-
- papers = search_result.get("papers", [])
- results["steps"].append({
- "step": 1,
- "name": "Hunter - 论文搜索",
- "status": "completed",
- "result": {
- "total_found": len(papers),
- "papers": papers
- }
- })
-
- if not papers:
- raise HTTPException(status_code=404, detail="未找到相关论文")
-
- except Exception as e:
- logger.error(f"论文搜索失败: {str(e)}")
- results["steps"].append({
- "step": 1,
- "name": "Hunter - 论文搜索",
- "status": "failed",
- "error": str(e)
- })
- results["status"] = "failed"
- return results
-
- # 步骤 2: Miner - 分析每篇论文
- logger.info(f"[工作流 {workflow_id}] 步骤 2/4: 分析论文")
- analyses = []
- try:
- from api.routes.analysis import analyze_paper, PaperAnalysisRequest
-
- # 分析前3篇论文
- for i, paper in enumerate(papers[:3]):
- try:
- analysis_result = await analyze_paper(PaperAnalysisRequest(
- paper_url=paper["url"],
- analysis_type=request.analysis_type
- ))
- analyses.append({
- "paper_id": paper["id"],
- "title": paper["title"],
- "analysis": analysis_result.get("analysis", "")
- })
- except Exception as e:
- logger.warning(f"分析论文 {paper['id']} 失败: {str(e)}")
- continue
-
- results["steps"].append({
- "step": 2,
- "name": "Miner - 论文分析",
- "status": "completed",
- "result": {
- "total_analyzed": len(analyses),
- "analyses": analyses
- }
- })
-
- except Exception as e:
- logger.error(f"论文分析失败: {str(e)}")
- results["steps"].append({
- "step": 2,
- "name": "Miner - 论文分析",
- "status": "failed",
- "error": str(e)
- })
-
- # 步骤 3: Validator - 生成和校验引用
- logger.info(f"[工作流 {workflow_id}] 步骤 3/4: 生成引用")
- citations = []
- try:
- from api.routes.citations import validate_citation, CitationValidationRequest
-
- # 为每篇论文生成引用
- for paper in papers[:3]:
- try:
- # 构建引用文本
- authors_str = ", ".join(paper["authors"][:3])
- if len(paper["authors"]) > 3:
- authors_str += " et al."
-
- citation_text = f"{authors_str} ({paper['published_date'][:4]}). {paper['title']}. arXiv:{paper['id']}"
-
- citation_result = await validate_citation(CitationValidationRequest(
- citation=citation_text,
- format=request.citation_format
- ))
-
- citations.append({
- "paper_id": paper["id"],
- "title": paper["title"],
- "formatted_citation": citation_result.get("formatted_citation", "")
- })
- except Exception as e:
- logger.warning(f"生成引用失败: {str(e)}")
- continue
-
- results["steps"].append({
- "step": 3,
- "name": "Validator - 引用生成",
- "status": "completed",
- "result": {
- "total_citations": len(citations),
- "citations": citations
- }
- })
-
- except Exception as e:
- logger.error(f"引用生成失败: {str(e)}")
- results["steps"].append({
- "step": 3,
- "name": "Validator - 引用生成",
- "status": "failed",
- "error": str(e)
- })
-
- # 步骤 4: Coach - 生成综合报告(可选)
- if request.writing_task:
- logger.info(f"[工作流 {workflow_id}] 步骤 4/4: 生成报告")
- try:
- from api.routes.writing import writing_coach, WritingCoachRequest
-
- # 构建综合报告文本
- report_text = f"# 关于 '{request.keywords}' 的研究综述\n\n"
- report_text += f"## 搜索结果\n找到 {len(papers)} 篇相关论文\n\n"
-
- if analyses:
- report_text += "## 论文分析\n"
- for i, analysis in enumerate(analyses[:3], 1):
- report_text += f"\n### {i}. {analysis['title']}\n"
- report_text += f"{analysis['analysis'][:500]}...\n"
-
- if citations:
- report_text += "\n## 参考文献\n"
- for i, citation in enumerate(citations, 1):
- report_text += f"{i}. {citation['formatted_citation']}\n"
-
- # 使用 Coach 改进报告
- writing_result = await writing_coach(WritingCoachRequest(
- text=report_text,
- style="academic",
- task=request.writing_task
- ))
-
- results["steps"].append({
- "step": 4,
- "name": "Coach - 报告生成",
- "status": "completed",
- "result": {
- "report": writing_result.get("result", "")
- }
- })
-
- except Exception as e:
- logger.error(f"报告生成失败: {str(e)}")
- results["steps"].append({
- "step": 4,
- "name": "Coach - 报告生成",
- "status": "failed",
- "error": str(e)
- })
-
- # 完成工作流
- results["status"] = "completed"
- results["summary"] = {
- "total_papers": len(papers),
- "analyzed_papers": len(analyses),
- "generated_citations": len(citations),
- "keywords": request.keywords
- }
-
- logger.info(f"[工作流 {workflow_id}] 完成")
- return results
-
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"工作流执行失败: {str(e)}")
- raise HTTPException(status_code=500, detail=f"工作流执行失败: {str(e)}")
- @router.post("/search-and-analyze", response_model=Dict[str, Any])
- async def search_and_analyze(request: WorkflowRequest):
- """
- 简化工作流:搜索 + 分析
- 只执行搜索和分析步骤
- """
- try:
- results = {
- "status": "running",
- "steps": []
- }
-
- # 步骤 1: 搜索论文
- from api.routes.papers import search_papers, PaperSearchRequest
-
- search_result = await search_papers(PaperSearchRequest(
- keywords=request.keywords,
- source="arxiv",
- limit=request.limit
- ))
-
- papers = search_result.get("papers", [])
- results["steps"].append({
- "step": 1,
- "name": "搜索论文",
- "status": "completed",
- "papers": papers
- })
-
- if not papers:
- raise HTTPException(status_code=404, detail="未找到相关论文")
-
- # 步骤 2: 分析第一篇论文
- from api.routes.analysis import analyze_paper, PaperAnalysisRequest
-
- first_paper = papers[0]
- analysis_result = await analyze_paper(PaperAnalysisRequest(
- paper_url=first_paper["url"],
- analysis_type=request.analysis_type
- ))
-
- results["steps"].append({
- "step": 2,
- "name": "分析论文",
- "status": "completed",
- "analysis": analysis_result
- })
-
- results["status"] = "completed"
- return results
-
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"搜索和分析失败: {str(e)}")
- raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}")
- @router.get("/status/{workflow_id}")
- async def get_workflow_status(workflow_id: str):
- """获取工作流状态"""
- try:
- # 这里可以实现工作流状态跟踪
- # 暂时返回模拟状态
- return {
- "workflow_id": workflow_id,
- "status": "completed",
- "progress": 100,
- "message": "工作流已完成"
- }
- except Exception as e:
- logger.error(f"获取工作流状态失败: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
|