analysis_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. """
  2. 智能股票分析助手 — 分析报告服务层
  3. 协调多数据源(行情、财务、资讯),生成个股深度分析报告。
  4. 支持报告持久化存储与历史查询。
  5. """
  6. import sys
  7. import json
  8. from pathlib import Path
  9. from typing import Optional
  10. from datetime import datetime
  11. _PROJECT_ROOT = Path(__file__).parent.parent.parent.parent # backend/app/services -> project root
  12. _BACKEND_DIR = _PROJECT_ROOT
  13. for p in [str(_PROJECT_ROOT), str(_BACKEND_DIR)]:
  14. if str(p) not in sys.path:
  15. sys.path.insert(0, str(p))
  16. from app.models.database import async_session_factory
  17. from app.models.report import AnalysisReport
  18. from app.services.market_service import get_stock_quote, get_stock_financial, get_stock_profile
  19. from app.services.news_service import analyze_sentiment
  20. async def generate_analysis_report(
  21. stock_code: str,
  22. user_id: str = "default",
  23. report_type: str = "full",
  24. ) -> dict:
  25. """生成个股深度分析报告
  26. 收集行情数据、财务数据、公司概况、舆情信息,整合为结构化分析报告。
  27. Args:
  28. stock_code: 6位股票代码
  29. user_id: 用户标识
  30. report_type: 报告类型 full/quick
  31. Returns:
  32. {
  33. "success": True/False,
  34. "report": { ... } or None,
  35. "error": str or None,
  36. "data_collected": dict # 各数据源的收集状态
  37. }
  38. """
  39. result = {
  40. "success": False,
  41. "report": None,
  42. "error": None,
  43. "data_collected": {},
  44. }
  45. try:
  46. # 阶段1: 收集行情数据
  47. quote_data = get_stock_quote(stock_code)
  48. result["data_collected"]["quote"] = quote_data["success"]
  49. # 阶段2: 收集财务数据
  50. financial_data = get_stock_financial(stock_code)
  51. result["data_collected"]["financial"] = financial_data["success"]
  52. # 阶段3: 收集公司概况
  53. profile_data = get_stock_profile(stock_code)
  54. result["data_collected"]["profile"] = profile_data["success"]
  55. # 阶段4: 收集舆情数据(异步)
  56. sentiment_data = analyze_sentiment(stock_code)
  57. result["data_collected"]["sentiment"] = sentiment_data["success"]
  58. # 阶段5: 构建报告内容
  59. stock_name = _extract_stock_name(profile_data)
  60. report_content = _build_report_content(
  61. stock_code, stock_name, report_type,
  62. quote_data, financial_data, profile_data, sentiment_data
  63. )
  64. report_summary = _generate_summary(report_content)
  65. # 阶段6: 持久化报告
  66. async with async_session_factory() as db:
  67. data_snapshot = json.dumps({
  68. "quote": {"success": quote_data["success"], "tables": quote_data.get("tables", [])},
  69. "financial": {"success": financial_data["success"], "tables": financial_data.get("tables", [])},
  70. "profile": {"success": profile_data["success"], "tables": profile_data.get("tables", [])},
  71. "sentiment": {
  72. "success": sentiment_data["success"],
  73. "total_count": sentiment_data.get("total_count", 0),
  74. },
  75. }, ensure_ascii=False)
  76. report = AnalysisReport(
  77. user_id=user_id,
  78. stock_code=stock_code,
  79. stock_name=stock_name,
  80. report_type=report_type,
  81. summary=report_summary,
  82. content=report_content,
  83. data_snapshot=data_snapshot,
  84. )
  85. db.add(report)
  86. await db.commit()
  87. await db.refresh(report)
  88. result["report"] = report.to_dict()
  89. result["success"] = True
  90. return result
  91. except Exception as e:
  92. result["error"] = str(e)
  93. return result
  94. async def get_report(report_id: int) -> dict:
  95. """获取指定报告
  96. Args:
  97. report_id: 报告ID
  98. Returns:
  99. {"success": True/False, "report": {...} or None, "error": str or None}
  100. """
  101. result = {"success": False, "report": None, "error": None}
  102. try:
  103. async with async_session_factory() as db:
  104. from sqlalchemy import select
  105. stmt = select(AnalysisReport).where(AnalysisReport.id == report_id)
  106. db_result = await db.execute(stmt)
  107. report = db_result.scalar_one_or_none()
  108. if report is None:
  109. result["error"] = f"报告 {report_id} 不存在"
  110. return result
  111. result["report"] = report.to_dict()
  112. result["success"] = True
  113. return result
  114. except Exception as e:
  115. result["error"] = str(e)
  116. return result
  117. async def get_user_reports(user_id: str = "default", limit: int = 20) -> dict:
  118. """获取用户的历史分析报告列表
  119. Args:
  120. user_id: 用户标识
  121. limit: 最大返回数量
  122. Returns:
  123. {"success": True/False, "reports": [...], "total": int, "error": str or None}
  124. """
  125. result = {"success": False, "reports": [], "total": 0, "error": None}
  126. try:
  127. async with async_session_factory() as db:
  128. from sqlalchemy import select, func
  129. # 查询总数
  130. count_stmt = select(func.count(AnalysisReport.id)).where(
  131. AnalysisReport.user_id == user_id
  132. )
  133. db_result = await db.execute(count_stmt)
  134. total = db_result.scalar() or 0
  135. # 查询列表
  136. stmt = (
  137. select(AnalysisReport)
  138. .where(AnalysisReport.user_id == user_id)
  139. .order_by(AnalysisReport.created_at.desc())
  140. .limit(limit)
  141. )
  142. db_result = await db.execute(stmt)
  143. reports = db_result.scalars().all()
  144. result["reports"] = [r.to_dict() for r in reports]
  145. result["total"] = total
  146. result["success"] = True
  147. return result
  148. except Exception as e:
  149. result["error"] = str(e)
  150. return result
  151. def _extract_stock_name(profile_data: dict) -> str:
  152. """从公司概况数据中提取股票名称"""
  153. try:
  154. tables = profile_data.get("tables", [])
  155. for table in tables:
  156. rows = table.get("rows", [])
  157. for row in rows:
  158. for key in row:
  159. if "名称" in key or "简称" in key:
  160. return str(row[key])
  161. return ""
  162. except Exception:
  163. return ""
  164. def _build_report_content(
  165. stock_code: str,
  166. stock_name: str,
  167. report_type: str,
  168. quote_data: dict,
  169. financial_data: dict,
  170. profile_data: dict,
  171. sentiment_data: dict,
  172. ) -> str:
  173. """构建报告Markdown内容"""
  174. title = stock_name or stock_code
  175. now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  176. lines = []
  177. lines.append(f"# {title}({stock_code})深度分析报告")
  178. lines.append(f"**生成时间**: {now}")
  179. lines.append(f"**报告类型**: {'完整分析' if report_type == 'full' else '快速概览'}")
  180. lines.append("")
  181. lines.append("---")
  182. lines.append("")
  183. # 1. 行情概览
  184. lines.append("## 一、行情概览")
  185. if quote_data.get("success"):
  186. lines.append(_format_data_section(quote_data))
  187. else:
  188. lines.append("> ⚠️ 行情数据获取失败")
  189. if quote_data.get("error"):
  190. lines.append(f"> 原因: {quote_data['error']}")
  191. lines.append("")
  192. # 2. 财务分析
  193. lines.append("## 二、财务分析")
  194. if financial_data.get("success"):
  195. lines.append(_format_data_section(financial_data))
  196. else:
  197. lines.append("> ⚠️ 财务数据获取失败")
  198. if financial_data.get("error"):
  199. lines.append(f"> 原因: {financial_data['error']}")
  200. lines.append("")
  201. # 3. 公司概况
  202. lines.append("## 三、公司概况")
  203. if profile_data.get("success"):
  204. lines.append(_format_data_section(profile_data))
  205. else:
  206. lines.append("> ⚠️ 公司概况获取失败")
  207. if profile_data.get("error"):
  208. lines.append(f"> 原因: {profile_data['error']}")
  209. lines.append("")
  210. # 4. 舆情分析
  211. lines.append("## 四、舆情分析")
  212. if sentiment_data.get("success"):
  213. total = sentiment_data.get("total_count", 0)
  214. news_count = len(sentiment_data.get("news_items", []))
  215. report_count = len(sentiment_data.get("report_items", []))
  216. ann_count = len(sentiment_data.get("announce_items", []))
  217. lines.append(f"- 相关资讯总数: {total} 条")
  218. lines.append(f" - 新闻: {news_count} 条")
  219. lines.append(f" - 研报: {report_count} 条")
  220. lines.append(f" - 公告: {ann_count} 条")
  221. if news_count > 0:
  222. lines.append("")
  223. lines.append("### 近期新闻")
  224. for item in sentiment_data.get("news_items", [])[:5]:
  225. title = item.get("title", "")
  226. date = item.get("date", "").split()[0] if item.get("date") else ""
  227. institution = item.get("institution", "")
  228. source = f" — {institution}" if institution else ""
  229. lines.append(f"- [{date}] {title}{source}")
  230. else:
  231. lines.append("> ⚠️ 舆情数据获取失败")
  232. if sentiment_data.get("error"):
  233. lines.append(f"> 原因: {sentiment_data['error']}")
  234. lines.append("")
  235. # 5. 综合评估
  236. lines.append("## 五、综合评估")
  237. lines.append("> 基于以上数据的综合评估分析如下:")
  238. lines.append("")
  239. collected_count = sum(1 for v in [
  240. quote_data.get("success"),
  241. financial_data.get("success"),
  242. profile_data.get("success"),
  243. sentiment_data.get("success"),
  244. ] if v)
  245. if collected_count >= 3:
  246. lines.append(f"数据收集完成度: {collected_count}/4,综合分析可用。")
  247. lines.append("")
  248. lines.append("### 估值参考(需结合AI Agent深度分析)")
  249. lines.append("- 请参考【行情概览】部分的实时估值数据")
  250. lines.append("- 请参考【财务分析】部分的ROE、净利润等核心指标")
  251. lines.append("")
  252. else:
  253. lines.append(f"⚠️ 数据收集不完整({collected_count}/4),建议检查API Key配置后重试。")
  254. lines.append("### 投资建议")
  255. lines.append("> ⚠️ **免责声明**: 本报告由智能股票分析助手自动生成,所有数据来源于东方财富妙想API。")
  256. lines.append("> 分析结果仅供参考和学习,不构成任何投资建议。投资有风险,入市需谨慎。")
  257. lines.append("")
  258. return "\n".join(lines)
  259. def _format_data_section(data: dict) -> str:
  260. """将数据表格格式化为Markdown"""
  261. lines = []
  262. tables = data.get("tables", [])
  263. if not tables:
  264. return "(暂无数据)"
  265. for table in tables[:3]: # 最多显示3个表
  266. sheet_name = table.get("sheet_name", "")
  267. rows = table.get("rows", [])
  268. fieldnames = table.get("fieldnames", [])
  269. if sheet_name:
  270. lines.append(f"### {sheet_name}")
  271. if not rows:
  272. lines.append("(无数据)")
  273. continue
  274. # 限制行数
  275. display_rows = rows[:10]
  276. display_fields = fieldnames[:8]
  277. # 表头
  278. header = " | ".join(display_fields)
  279. lines.append(f"| {header} |")
  280. lines.append(f"|{'|'.join(['---'] * len(display_fields))}|")
  281. for row in display_rows:
  282. values = [str(row.get(col, "")) for col in display_fields]
  283. lines.append(f"| {' | '.join(values)} |")
  284. if len(rows) > 10:
  285. lines.append(f"*(共{len(rows)}行,仅显示前10行)*")
  286. lines.append("")
  287. return "\n".join(lines)
  288. def _generate_summary(report_content: str) -> str:
  289. """从报告内容中生成简短摘要"""
  290. # 从报告内容提取关键信息生成摘要
  291. try:
  292. lines = report_content.split("\n")
  293. data_status = ""
  294. for line in lines:
  295. if "数据收集完成度" in line:
  296. data_status = line.strip()
  297. break
  298. return f"[智能股票分析助手] 分析报告已生成。{data_status}详见完整报告。"
  299. except Exception:
  300. return "分析报告已生成,请查看完整内容。"