news_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. """
  2. 智能股票分析助手 — 资讯搜索服务层
  3. 封装金融资讯搜索、个股舆情分析的数据查询逻辑。
  4. 含 mx-search 计时缓存与额度用尽时的缓存降级。
  5. """
  6. from __future__ import annotations
  7. import copy
  8. import sys
  9. from pathlib import Path
  10. _PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
  11. _AGENTS_DIR = _PROJECT_ROOT / "agents"
  12. _SKILLS_SEARCH = _PROJECT_ROOT / "skills" / "资讯搜索" / "mx-search"
  13. for p in [_AGENTS_DIR, _SKILLS_SEARCH, str(_PROJECT_ROOT)]:
  14. if str(p) not in sys.path:
  15. sys.path.insert(0, str(p))
  16. from app.config import settings
  17. from app.services.mx_timed_cache import get_mx_timed_cache, mx_cache_ttl_seconds
  18. from app.utils.mx_fixture import try_load_raw_fixture
  19. from app.utils.mx_quota import MX_QUOTA_HINT, is_mx_quota_exhausted, quota_exhausted_no_cache_message
  20. def _meta_block(*, from_cache: bool, quota_exhausted: bool, channel: str) -> dict:
  21. m = {
  22. "from_cache": from_cache,
  23. "quota_exhausted": quota_exhausted,
  24. "cache_ttl_seconds": int(mx_cache_ttl_seconds()),
  25. "channel": channel,
  26. }
  27. if quota_exhausted:
  28. m["hint"] = MX_QUOTA_HINT
  29. return m
  30. def _attach(payload: dict, meta: dict) -> dict:
  31. out = copy.deepcopy(payload)
  32. out["_mx_meta"] = meta
  33. return out
  34. def _merge_item_text_fields(item: dict) -> str:
  35. """合并正文/摘要等字段(妙想不同条目可能落在 content、summary 等键上)。"""
  36. chunks: list[str] = []
  37. seen: set[str] = set()
  38. for key in (
  39. "content",
  40. "summary",
  41. "abstract",
  42. "digest",
  43. "snippet",
  44. "description",
  45. "desc",
  46. "text",
  47. ):
  48. v = item.get(key)
  49. if not isinstance(v, str):
  50. continue
  51. s = v.strip()
  52. if not s or s in seen:
  53. continue
  54. seen.add(s)
  55. chunks.append(s)
  56. return "\n\n".join(chunks)
  57. def _normalize_information_type(item: dict) -> tuple[str, str]:
  58. """将妙想条目上的类型字段统一为 (NEWS|REPORT|ANNOUNCEMENT|OTHER, 中文标签)。
  59. 常见坑:仅写了 info_type 默认 NEWS,但 info_type_cn 因 informationType 为空变成「资讯」,
  60. 前端饼图只认三种中文,导致大量条目不计入分布。
  61. """
  62. raw = item.get("informationType")
  63. if raw is None or (isinstance(raw, str) and not raw.strip()):
  64. for k in ("infoType", "information_type", "info_type", "docType", "category", "dataType"):
  65. v = item.get(k)
  66. if v is not None and str(v).strip():
  67. raw = v
  68. break
  69. cn_by_en = {"NEWS": "新闻", "REPORT": "研报", "ANNOUNCEMENT": "公告", "OTHER": "其他"}
  70. if raw is None or (isinstance(raw, str) and not str(raw).strip()):
  71. return "NEWS", cn_by_en["NEWS"]
  72. if isinstance(raw, (int, float)):
  73. # 无公开数字枚举说明时不猜测,避免错分进「仅研报」等畸形分布
  74. return "OTHER", cn_by_en["OTHER"]
  75. s = str(raw).strip()
  76. u = s.upper().replace(" ", "_").replace("-", "_")
  77. if u in ("NEWS", "REPORT", "ANNOUNCEMENT"):
  78. return u, cn_by_en[u]
  79. # 小写 json:news / report / announcement
  80. low = s.lower()
  81. if low in ("news",):
  82. return "NEWS", cn_by_en["NEWS"]
  83. if low in ("report",):
  84. return "REPORT", cn_by_en["REPORT"]
  85. if low in ("announcement", "announce"):
  86. return "ANNOUNCEMENT", cn_by_en["ANNOUNCEMENT"]
  87. # 中文或混合文案
  88. if "公告" in s:
  89. return "ANNOUNCEMENT", cn_by_en["ANNOUNCEMENT"]
  90. if "研报" in s or "研究报告" in s:
  91. return "REPORT", cn_by_en["REPORT"]
  92. if "新闻" in s:
  93. return "NEWS", cn_by_en["NEWS"]
  94. if "ANNOUNCE" in u or "NOTICE" in u:
  95. return "ANNOUNCEMENT", cn_by_en["ANNOUNCEMENT"]
  96. if "REPORT" in u:
  97. return "REPORT", cn_by_en["REPORT"]
  98. if "NEWS" in u:
  99. return "NEWS", cn_by_en["NEWS"]
  100. return "OTHER", cn_by_en["OTHER"]
  101. # 妙想 / 东方财富资讯条目可能出现的链接字段(含嵌套 dict 扫描)
  102. _URL_KEYS_ORDERED = (
  103. "url",
  104. "link",
  105. "articleUrl",
  106. "sourceUrl",
  107. "detailUrl",
  108. "pcUrl",
  109. "h5Url",
  110. "jumpUrl",
  111. "artUrl",
  112. "newsUrl",
  113. "oriUrl",
  114. "originUrl",
  115. "showUrl",
  116. "pageUrl",
  117. "wapUrl",
  118. "pcLink",
  119. "h5Link",
  120. "article_url",
  121. "news_url",
  122. "srcUrl",
  123. "webUrl",
  124. "mobileUrl",
  125. "urlPc",
  126. "urlH5",
  127. "pc_url",
  128. "h5_url",
  129. "shareUrl",
  130. "share_link",
  131. )
  132. # 递归扫描时跳过明显正文/标题字段,避免误把片段当外链
  133. _SKIP_URL_SCAN_KEYS = frozenset(
  134. {
  135. "content",
  136. "summary",
  137. "abstract",
  138. "digest",
  139. "snippet",
  140. "description",
  141. "desc",
  142. "text",
  143. "title",
  144. "body",
  145. "rawContent",
  146. "answer",
  147. }
  148. )
  149. def _item_original_url(item: dict, *, depth: int = 0) -> str:
  150. """提取可外链打开的原文地址(若有)。兼容多层嵌套与非常规字段名。"""
  151. if not isinstance(item, dict) or depth > 6:
  152. return ""
  153. for key in _URL_KEYS_ORDERED:
  154. v = item.get(key)
  155. if isinstance(v, str):
  156. s = v.strip()
  157. if s.lower().startswith(("http://", "https://")):
  158. return s
  159. for k, v in item.items():
  160. if k in _SKIP_URL_SCAN_KEYS:
  161. continue
  162. if isinstance(v, str):
  163. s = v.strip()
  164. if len(s) > 2048:
  165. continue
  166. if s.lower().startswith(("http://", "https://")):
  167. return s
  168. elif isinstance(v, dict):
  169. inner = _item_original_url(v, depth=depth + 1)
  170. if inner:
  171. return inner
  172. elif isinstance(v, list) and depth < 4:
  173. for el in v[:24]:
  174. if isinstance(el, dict):
  175. inner = _item_original_url(el, depth=depth + 1)
  176. if inner:
  177. return inner
  178. return ""
  179. def _mx_search_from_raw(query: str, raw_result: dict) -> dict:
  180. """将 mx-search 原始响应转为统一 payload"""
  181. result = {
  182. "success": False,
  183. "query": query,
  184. "total_count": 0,
  185. "items": [],
  186. "error": None,
  187. }
  188. status = raw_result.get("status")
  189. if status != 0:
  190. result["error"] = raw_result.get("message", f"API返回错误,状态码: {status}")
  191. return result
  192. data = raw_result.get("data", {})
  193. inner_data = data.get("data", {})
  194. search_response = inner_data.get("llmSearchResponse", {})
  195. items = search_response.get("data", []) or []
  196. parsed_items = []
  197. for item in items:
  198. if not isinstance(item, dict):
  199. continue
  200. body = _merge_item_text_fields(item)
  201. en_type, cn_type = _normalize_information_type(item)
  202. parsed_items.append({
  203. "title": item.get("title", "无标题"),
  204. "content": body,
  205. "date": item.get("date", ""),
  206. "institution": item.get("insName", ""),
  207. "info_type": en_type,
  208. "info_type_cn": cn_type,
  209. "rating": item.get("rating", ""),
  210. "entity_name": item.get("entityFullName", ""),
  211. "url": _item_original_url(item),
  212. })
  213. result["success"] = True
  214. result["total_count"] = len(parsed_items)
  215. result["items"] = parsed_items
  216. return result
  217. def _fetch_mx_search_live(query: str) -> dict:
  218. """直连 mx-search;MX_REPLAY_FIXTURES 时优先读本地原始 JSON"""
  219. raw_fixture = try_load_raw_fixture("mx_search", query)
  220. if raw_fixture is not None:
  221. try:
  222. return _mx_search_from_raw(query, raw_fixture)
  223. except Exception as e:
  224. return {
  225. "success": False,
  226. "query": query,
  227. "total_count": 0,
  228. "items": [],
  229. "error": f"[fixture] {e}",
  230. }
  231. key_ok = bool(settings.MX_APIKEY and settings.MX_APIKEY != "your-mx-apikey-here")
  232. if not key_ok:
  233. return {
  234. "success": False,
  235. "query": query,
  236. "total_count": 0,
  237. "items": [],
  238. "error": "MX_APIKEY 未配置,且无匹配的本地 fixture",
  239. }
  240. try:
  241. import mx_search as _mx
  242. search_client = _mx.MXSearch(api_key=settings.MX_APIKEY)
  243. raw_result = search_client.search(query)
  244. return _mx_search_from_raw(query, raw_result)
  245. except Exception as e:
  246. return {
  247. "success": False,
  248. "query": query,
  249. "total_count": 0,
  250. "items": [],
  251. "error": str(e),
  252. }
  253. def search_news(query: str) -> dict:
  254. """搜索金融资讯(同一 query 在 TTL 内走缓存;关键词/个股不同则 query 不同)"""
  255. result = {
  256. "success": False,
  257. "query": query,
  258. "total_count": 0,
  259. "items": [],
  260. "error": None,
  261. }
  262. key_missing = not settings.MX_APIKEY or settings.MX_APIKEY == "your-mx-apikey-here"
  263. if key_missing and not settings.MX_REPLAY_FIXTURES:
  264. result["error"] = "MX_APIKEY 未配置"
  265. return result
  266. cache = get_mx_timed_cache()
  267. ttl = mx_cache_ttl_seconds()
  268. key = cache.make_key("mx_search", query)
  269. fresh = cache.get_fresh(key, ttl)
  270. if fresh is not None:
  271. return _attach(fresh, _meta_block(from_cache=True, quota_exhausted=False, channel="mx_search"))
  272. live = _fetch_mx_search_live(query)
  273. if live["success"]:
  274. cache.set(key, live)
  275. return _attach(live, _meta_block(from_cache=False, quota_exhausted=False, channel="mx_search"))
  276. err = live.get("error") or ""
  277. if is_mx_quota_exhausted(err):
  278. stale = cache.get_stale(key)
  279. if stale:
  280. merged = copy.deepcopy(stale)
  281. merged["success"] = True
  282. merged["query"] = query
  283. return _attach(merged, _meta_block(from_cache=True, quota_exhausted=True, channel="mx_search"))
  284. live["error"] = quota_exhausted_no_cache_message(err)
  285. return live
  286. return live
  287. def search_stock_news(code: str) -> dict:
  288. """搜索个股相关资讯"""
  289. query = f"{code} 最新研报 新闻 公告"
  290. return search_news(query)
  291. def search_sector_news(sector: str) -> dict:
  292. """搜索行业/板块相关资讯"""
  293. query = f"{sector}板块近期新闻 政策解读"
  294. return search_news(query)
  295. def search_market_news() -> dict:
  296. """搜索市场热门资讯"""
  297. query = "今日A股市场热点 大盘动态 北向资金"
  298. return search_news(query)
  299. def analyze_sentiment(code: str) -> dict:
  300. """个股舆情分析(文件缓存优先 + 继承底层 search 的 _mx_meta)"""
  301. from app.services.stock_file_cache import get_stock_file_cache
  302. fc = get_stock_file_cache()
  303. cached = fc.get(code, "sentiment")
  304. if cached and cached.get("data") and cached["data"].get("success"):
  305. return cached["data"]
  306. base = search_stock_news(code)
  307. meta = base.get("_mx_meta")
  308. if not base["success"]:
  309. out = {
  310. "success": False,
  311. "code": code,
  312. "total_count": 0,
  313. "news_items": [],
  314. "report_items": [],
  315. "announce_items": [],
  316. "error": base["error"],
  317. }
  318. if meta:
  319. out["_mx_meta"] = meta
  320. return out
  321. items = base["items"]
  322. report_items = [i for i in items if i["info_type"] == "REPORT"]
  323. announce_items = [i for i in items if i["info_type"] == "ANNOUNCEMENT"]
  324. news_items = [i for i in items if i["info_type"] not in ("REPORT", "ANNOUNCEMENT")]
  325. out = {
  326. "success": True,
  327. "code": code,
  328. "total_count": base["total_count"],
  329. "news_items": news_items,
  330. "report_items": report_items,
  331. "announce_items": announce_items,
  332. "error": None,
  333. }
  334. if meta:
  335. out["_mx_meta"] = meta
  336. fc.set(code, "sentiment", out)
  337. return out