miner.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. """
  2. InnoCore AI 洞察专家 (Miner Agent)
  3. 核心大脑。负责阅读、理解、检索历史库、对比分析并生成报告
  4. """
  5. import asyncio
  6. from typing import Dict, List, Optional, Any
  7. import json
  8. import re
  9. from datetime import datetime
  10. from agents.base import BaseAgent
  11. from core.database import db_manager
  12. from core.vector_store import vector_store_manager
  13. from core.exceptions import AgentException
  14. class MinerAgent(BaseAgent):
  15. """洞察专家智能体"""
  16. def __init__(self, llm=None):
  17. super().__init__("Miner", llm)
  18. # 添加工具
  19. self.add_tool("parse_pdf", self._parse_pdf, "解析PDF文件")
  20. self.add_tool("search_memory", self._search_memory, "搜索记忆库")
  21. self.add_tool("compare_papers", self._compare_papers, "对比论文")
  22. self.add_tool("generate_report", self._generate_report, "生成分析报告")
  23. async def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
  24. """执行论文分析和创新点挖掘任务"""
  25. await self.validate_input(input_data)
  26. self.set_state("running")
  27. try:
  28. paper_id = input_data["paper_id"]
  29. user_id = input_data.get("user_id")
  30. analysis_type = input_data.get("analysis_type", "full") # full, quick, innovation_only
  31. # 获取论文信息
  32. paper = await db_manager.get_paper(paper_id)
  33. if not paper:
  34. raise AgentException(f"论文不存在: {paper_id}")
  35. self._add_to_history(f"开始分析论文: {paper['title']}")
  36. # 1. 解析PDF内容
  37. parsed_content = await self._parse_paper_content(paper)
  38. # 2. 检索相关历史论文
  39. related_papers = await self._find_related_papers(
  40. paper["title"],
  41. paper["abstract"],
  42. user_id
  43. )
  44. # 3. 进行对比分析
  45. comparison_result = await self._perform_comparison_analysis(
  46. parsed_content,
  47. related_papers
  48. )
  49. # 4. 生成分析报告
  50. report = await self._create_analysis_report(
  51. paper,
  52. parsed_content,
  53. related_papers,
  54. comparison_result,
  55. user_id
  56. )
  57. # 5. 保存报告到数据库
  58. report_id = await self._save_analysis_report(paper_id, report, user_id)
  59. # 6. 更新向量库
  60. await self._update_vector_store(paper_id, paper, parsed_content, user_id)
  61. self.set_state("completed")
  62. return {
  63. "status": "success",
  64. "paper_id": paper_id,
  65. "report_id": report_id,
  66. "analysis_type": analysis_type,
  67. "parsed_content": {
  68. "sections": list(parsed_content.get("sections", {}).keys()),
  69. "word_count": parsed_content.get("word_count", 0)
  70. },
  71. "related_papers_count": len(related_papers),
  72. "report_summary": {
  73. "summary": report.get("summary", "")[:200] + "...",
  74. "innovation_points": len(report.get("innovation_points", [])),
  75. "limitations": len(report.get("limitations", [])),
  76. "future_ideas": len(report.get("future_ideas", []))
  77. }
  78. }
  79. except Exception as e:
  80. self.set_state("error")
  81. raise AgentException(f"Miner Agent执行失败: {str(e)}")
  82. def get_required_fields(self) -> List[str]:
  83. """获取必需的输入字段"""
  84. return ["paper_id"]
  85. async def _parse_paper_content(self, paper: Dict) -> Dict[str, Any]:
  86. """解析论文内容"""
  87. file_path = paper.get("file_path")
  88. if not file_path:
  89. # 如果没有PDF文件,使用标题和摘要
  90. return {
  91. "title": paper.get("title", ""),
  92. "abstract": paper.get("abstract", ""),
  93. "sections": {
  94. "abstract": paper.get("abstract", ""),
  95. "introduction": "",
  96. "method": "",
  97. "experiment": "",
  98. "conclusion": ""
  99. },
  100. "word_count": len(paper.get("abstract", "").split()),
  101. "parsing_method": "metadata_only"
  102. }
  103. # 这里应该使用专门的PDF解析库
  104. # 暂时返回模拟的结构化内容
  105. return await self._extract_structured_content(file_path)
  106. async def _extract_structured_content(self, file_path: str) -> Dict[str, Any]:
  107. """提取结构化内容"""
  108. try:
  109. # 这里应该集成Nougat或PyMuPDF进行深度解析
  110. # 暂时返回模拟数据
  111. mock_content = {
  112. "title": "Sample Paper Title",
  113. "abstract": "This is a sample abstract...",
  114. "sections": {
  115. "introduction": "In this paper, we propose...",
  116. "method": "Our method consists of...",
  117. "experiment": "We conducted experiments...",
  118. "conclusion": "The results show that..."
  119. },
  120. "word_count": 1500,
  121. "parsing_method": "mock_parser"
  122. }
  123. self._add_to_history(f"PDF解析完成: {file_path}")
  124. return mock_content
  125. except Exception as e:
  126. self._add_to_history(f"PDF解析失败: {str(e)}")
  127. return {
  128. "title": "",
  129. "abstract": "",
  130. "sections": {},
  131. "word_count": 0,
  132. "parsing_method": "failed"
  133. }
  134. async def _find_related_papers(self, title: str, abstract: str, user_id: str = None) -> List[Dict]:
  135. """查找相关论文"""
  136. try:
  137. # 构建查询
  138. query = f"{title} {abstract}"
  139. # 执行混合搜索
  140. search_results = await vector_store_manager.hybrid_search(
  141. query=query,
  142. user_id=user_id,
  143. top_k=10,
  144. include_l1=True,
  145. include_l2=bool(user_id)
  146. )
  147. # 获取详细论文信息
  148. related_papers = []
  149. for result in search_results:
  150. payload = result["payload"]
  151. paper_id = payload.get("paper_id")
  152. if paper_id:
  153. paper_info = await db_manager.get_paper(paper_id)
  154. if paper_info:
  155. paper_info["similarity_score"] = result["score"]
  156. paper_info["collection_type"] = result["collection_type"]
  157. related_papers.append(paper_info)
  158. self._add_to_history(f"找到 {len(related_papers)} 篇相关论文")
  159. return related_papers
  160. except Exception as e:
  161. self._add_to_history(f"搜索相关论文失败: {str(e)}")
  162. return []
  163. async def _perform_comparison_analysis(self, current_paper: Dict, related_papers: List[Dict]) -> Dict[str, Any]:
  164. """执行对比分析"""
  165. if not related_papers:
  166. return {
  167. "comparison_summary": "未找到相关论文进行对比",
  168. "unique_contributions": [],
  169. "similar_works": [],
  170. "gaps_identified": []
  171. }
  172. # 构建对比分析的prompt
  173. comparison_prompt = f"""
  174. 请分析当前论文与历史相关论文的对比情况:
  175. 当前论文:
  176. 标题:{current_paper.get('title', '')}
  177. 摘要:{current_paper.get('abstract', '')}
  178. 主要内容:{str(current_paper.get('sections', {}))[:1000]}...
  179. 相关论文:
  180. {self._format_related_papers_for_comparison(related_papers[:5])}
  181. 请从以下角度进行对比分析:
  182. 1. 方法的创新性和改进点
  183. 2. 实验设计的优势
  184. 3. 与现有工作的区别
  185. 4. 可能的研究空白
  186. 请以JSON格式返回分析结果。
  187. """
  188. try:
  189. response = await self.think(comparison_prompt)
  190. # 尝试解析JSON响应
  191. try:
  192. comparison_result = json.loads(response)
  193. except json.JSONDecodeError:
  194. # 如果JSON解析失败,使用文本解析
  195. comparison_result = self._parse_text_comparison(response)
  196. self._add_to_history("对比分析完成")
  197. return comparison_result
  198. except Exception as e:
  199. self._add_to_history(f"对比分析失败: {str(e)}")
  200. return {
  201. "comparison_summary": "对比分析过程中出现错误",
  202. "unique_contributions": [],
  203. "similar_works": [],
  204. "gaps_identified": []
  205. }
  206. def _format_related_papers_for_comparison(self, papers: List[Dict]) -> str:
  207. """格式化相关论文用于对比"""
  208. formatted = []
  209. for i, paper in enumerate(papers, 1):
  210. formatted.append(f"""
  211. 论文{i}:
  212. 标题:{paper.get('title', '')}
  213. 摘要:{paper.get('abstract', '')[:300]}...
  214. 相似度:{paper.get('similarity_score', 0):.3f}
  215. """)
  216. return "\n".join(formatted)
  217. def _parse_text_comparison(self, text: str) -> Dict[str, Any]:
  218. """解析文本格式的对比结果"""
  219. # 简单的文本解析逻辑
  220. return {
  221. "comparison_summary": text[:500],
  222. "unique_contributions": ["基于文本分析的创新点"],
  223. "similar_works": ["相关研究工作"],
  224. "gaps_identified": ["研究空白识别"]
  225. }
  226. async def _create_analysis_report(self, paper: Dict, parsed_content: Dict,
  227. related_papers: List[Dict], comparison_result: Dict,
  228. user_id: str = None) -> Dict[str, Any]:
  229. """创建分析报告"""
  230. report_prompt = f"""
  231. 基于以下信息,生成一份详细的论文分析报告:
  232. 论文信息:
  233. 标题:{paper.get('title', '')}
  234. 作者:{', '.join(paper.get('authors', []))}
  235. 摘要:{paper.get('abstract', '')}
  236. 解析内容:
  237. {str(parsed_content.get('sections', {}))[:1500]}...
  238. 对比分析结果:
  239. {str(comparison_result)[:1000]}...
  240. 请生成包含以下部分的报告:
  241. 1. Summary - 论文主要贡献和方法概述
  242. 2. Innovation - 相比相关论文的创新点
  243. 3. Limitation - 当前研究的局限性
  244. 4. Future Ideas - 基于分析的未来研究方向建议
  245. 请以JSON格式返回报告。
  246. """
  247. try:
  248. response = await self.think(report_prompt)
  249. # 尝试解析JSON响应
  250. try:
  251. report = json.loads(response)
  252. except json.JSONDecodeError:
  253. # 如果JSON解析失败,生成默认报告
  254. report = self._generate_default_report(paper, parsed_content, comparison_result)
  255. # 添加元数据
  256. report.update({
  257. "paper_id": paper.get("id"),
  258. "generated_for_user_id": user_id,
  259. "generated_at": datetime.now().isoformat(),
  260. "related_papers_count": len(related_papers),
  261. "analysis_method": "miner_agent"
  262. })
  263. self._add_to_history("分析报告生成完成")
  264. return report
  265. except Exception as e:
  266. self._add_to_history(f"生成分析报告失败: {str(e)}")
  267. return self._generate_default_report(paper, parsed_content, comparison_result)
  268. def _generate_default_report(self, paper: Dict, parsed_content: Dict, comparison_result: Dict) -> Dict[str, Any]:
  269. """生成默认报告"""
  270. return {
  271. "summary": f"本文提出了{paper.get('title', '')}相关的研究工作。",
  272. "innovation_points": ["需要进一步分析的创新点"],
  273. "limitations": ["识别出的研究局限性"],
  274. "future_ideas": ["建议的未来研究方向"],
  275. "paper_id": paper.get("id"),
  276. "generated_at": datetime.now().isoformat(),
  277. "analysis_method": "default"
  278. }
  279. async def _save_analysis_report(self, paper_id: str, report: Dict, user_id: str = None) -> str:
  280. """保存分析报告到数据库"""
  281. try:
  282. report_id = await db_manager.create_analysis_report(
  283. paper_id=paper_id,
  284. summary=report.get("summary", ""),
  285. innovation_point=json.dumps(report.get("innovation_points", []), ensure_ascii=False),
  286. limitation=json.dumps(report.get("limitations", []), ensure_ascii=False),
  287. future_idea=json.dumps(report.get("future_ideas", []), ensure_ascii=False),
  288. vector_ids=report.get("vector_ids", {}),
  289. user_id=user_id
  290. )
  291. self._add_to_history(f"分析报告已保存: {report_id}")
  292. return report_id
  293. except Exception as e:
  294. self._add_to_history(f"保存分析报告失败: {str(e)}")
  295. return ""
  296. async def _update_vector_store(self, paper_id: str, paper: Dict, parsed_content: Dict, user_id: str = None):
  297. """更新向量库"""
  298. try:
  299. title = paper.get("title", "")
  300. abstract = paper.get("abstract", "")
  301. # 组合内容
  302. content = f"{title} {abstract}"
  303. sections = parsed_content.get("sections", {})
  304. if sections:
  305. content += " " + " ".join(sections.values())
  306. # 添加到L2用户库
  307. if user_id:
  308. await vector_store_manager.add_to_l2(
  309. user_id=user_id,
  310. paper_id=paper_id,
  311. title=title,
  312. abstract=abstract,
  313. content=content,
  314. metadata={
  315. "authors": paper.get("authors", []),
  316. "sections": list(sections.keys()),
  317. "word_count": parsed_content.get("word_count", 0),
  318. "analysis_date": datetime.now().isoformat()
  319. }
  320. )
  321. self._add_to_history(f"论文已添加到用户向量库: {user_id}")
  322. except Exception as e:
  323. self._add_to_history(f"更新向量库失败: {str(e)}")
  324. # 工具方法
  325. async def _parse_pdf(self, file_path: str) -> Dict:
  326. """解析PDF工具"""
  327. return await self._extract_structured_content(file_path)
  328. async def _search_memory(self, query: str, user_id: str = None) -> List[Dict]:
  329. """搜索记忆库工具"""
  330. try:
  331. results = await vector_store_manager.hybrid_search(
  332. query=query,
  333. user_id=user_id,
  334. top_k=5
  335. )
  336. return [{"id": r["id"], "score": r["score"], "payload": r["payload"]} for r in results]
  337. except Exception as e:
  338. return [{"error": str(e)}]
  339. async def _compare_papers(self, current_paper: Dict, related_papers: List[Dict]) -> Dict:
  340. """对比论文工具"""
  341. return await self._perform_comparison_analysis(current_paper, related_papers)
  342. async def _generate_report(self, paper_info: Dict, analysis_result: Dict) -> Dict:
  343. """生成报告工具"""
  344. return await self._create_analysis_report(
  345. paper_info,
  346. analysis_result.get("parsed_content", {}),
  347. analysis_result.get("related_papers", []),
  348. analysis_result.get("comparison_result", {})
  349. )