market_service.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. """
  2. 智能股票分析助手 — 行情数据服务层
  3. 封装金融数据查询、解析和格式化逻辑,供API路由层调用。
  4. 含妙想 mx_data 计时缓存与额度用尽时的缓存降级。
  5. """
  6. from __future__ import annotations
  7. import copy
  8. import math
  9. import re
  10. import sys
  11. from pathlib import Path
  12. from typing import Any, Optional
  13. # 确保skills路径可导入
  14. _PROJECT_ROOT = Path(__file__).parent.parent.parent.parent # backend/app/services -> project root
  15. _AGENTS_DIR = _PROJECT_ROOT / "agents"
  16. _SKILLS_DATA = _PROJECT_ROOT / "skills" / "金融数据" / "mx-data"
  17. for p in [_AGENTS_DIR, _SKILLS_DATA, str(_PROJECT_ROOT)]:
  18. if str(p) not in sys.path:
  19. sys.path.insert(0, str(p))
  20. from app.config import settings
  21. from app.services.mx_timed_cache import get_mx_timed_cache, mx_cache_ttl_seconds
  22. from app.utils.mx_fixture import try_load_raw_fixture
  23. from app.utils.mx_quota import MX_QUOTA_HINT, is_mx_quota_exhausted, quota_exhausted_no_cache_message
  24. # 仪表盘指数卡片:从 mx tables 中解析列名(妙想返回的表头差异较大)
  25. _PRICE_HDR = re.compile(
  26. r"点位|最新点|收盘点|指数点位|收盘价|最新价|现价|收盘|价格|数值|行情|昨收|今开|当前价|最新报价|报价",
  27. re.I,
  28. )
  29. _CHANGE_HDR = re.compile(
  30. r"涨跌幅|涨跌幅度|当日涨幅|日涨跌幅|涨跌|涨幅|变动率",
  31. re.I,
  32. )
  33. _DATE_HDR = re.compile(r"日期|时间|^date$", re.I)
  34. _LONG_PRICE_LABEL = re.compile(
  35. r"点位|最新点|收盘点|指数点位|收盘价|最新价|现价|收盘|价格|最新|上证|深证|成指|沪深300|创业板指",
  36. )
  37. _LONG_CHANGE_LABEL = re.compile(r"涨跌幅|涨跌幅度|当日涨幅|涨跌|涨幅|变动率")
  38. def _parse_pct_cell(val: Any) -> Optional[float]:
  39. """解析涨跌幅单元格为浮点数(百分比数值,不带 % 也可)"""
  40. if val is None:
  41. return None
  42. if isinstance(val, (int, float)):
  43. x = float(val)
  44. return x if math.isfinite(x) else None
  45. s = str(val).strip()
  46. if not s or s in ("--", "—", "-", "暂无"):
  47. return None
  48. s = re.sub(r"[%%,,]", "", s)
  49. s = s.replace("+", "+").replace("−", "-").replace("-", "-")
  50. m = re.search(r"[-+]?\d+(?:\.\d+)?(?:e[-+]?\d+)?", s, re.I)
  51. if not m:
  52. return None
  53. try:
  54. x = float(m.group(0))
  55. except ValueError:
  56. return None
  57. return x if math.isfinite(x) else None
  58. def _cell_at(names: list, row: dict, idx: int) -> Any:
  59. if idx < 0 or not isinstance(row, dict):
  60. return None
  61. if idx >= len(names):
  62. return None
  63. return row.get(names[idx])
  64. def _heuristic_index_from_row(row: dict, names: list[str]) -> tuple[Optional[str], Optional[float]]:
  65. """列名无法识别时,按数值量级与 % 符号兜底"""
  66. change: Optional[float] = None
  67. price_disp: Optional[str] = None
  68. best: float = -1.0
  69. keys = [n for n in names if n in row] if names else list(row.keys())
  70. for k in keys:
  71. if _DATE_HDR.search(str(k)):
  72. continue
  73. raw = row[k]
  74. if raw is None:
  75. continue
  76. sv = str(raw).strip()
  77. if not sv:
  78. continue
  79. if "%" in sv or "%" in sv:
  80. p = _parse_pct_cell(sv)
  81. if p is not None:
  82. change = p
  83. continue
  84. p = _parse_pct_cell(sv)
  85. if p is None:
  86. continue
  87. # A 股主要指数点位多在数百~数万之间
  88. if 500 <= abs(p) <= 50000:
  89. if abs(p) > best:
  90. best = abs(p)
  91. price_disp = sv
  92. return price_disp, change
  93. def _extract_index_card_one_table(t0: dict) -> tuple[Optional[str], Optional[float]]:
  94. """从 mx_data 单个 sheet 解析点位与涨跌幅(字段名与行结构随品种变化较大)"""
  95. names: list = list(t0.get("fieldnames") or t0.get("fieldNames") or [])
  96. rows = t0.get("rows") or []
  97. if not rows:
  98. return None, None
  99. # 长表:恰两列,每行一个指标
  100. if len(names) == 2:
  101. lk, vk = names[0], names[1]
  102. price_s: Optional[str] = None
  103. change_v: Optional[float] = None
  104. for r in rows:
  105. if not isinstance(r, dict):
  106. continue
  107. lab = str(r.get(lk, "")).strip()
  108. raw = r.get(vk)
  109. if _LONG_CHANGE_LABEL.search(lab):
  110. c = _parse_pct_cell(raw)
  111. if c is not None:
  112. change_v = c
  113. elif _LONG_PRICE_LABEL.search(lab):
  114. if price_s is None and raw is not None and str(raw).strip():
  115. price_s = str(raw).strip()
  116. if price_s or change_v is not None:
  117. return price_s, change_v
  118. date_idx = next((i for i, n in enumerate(names) if _DATE_HDR.search(str(n))), -1)
  119. data_row = rows[-1] if date_idx >= 0 and len(rows) > 1 else rows[0]
  120. if isinstance(data_row, list):
  121. pi = next((i for i, n in enumerate(names) if _PRICE_HDR.search(str(n))), -1)
  122. ci = next((i for i, n in enumerate(names) if _CHANGE_HDR.search(str(n))), -1)
  123. raw_p = data_row[pi] if 0 <= pi < len(data_row) else None
  124. raw_c = data_row[ci] if 0 <= ci < len(data_row) else None
  125. ps = str(raw_p).strip() if raw_p is not None and str(raw_p).strip() else None
  126. cv = _parse_pct_cell(raw_c)
  127. if ps or cv is not None:
  128. return ps, cv
  129. return None, None
  130. if isinstance(data_row, dict):
  131. pi = next((i for i, n in enumerate(names) if _PRICE_HDR.search(str(n))), -1)
  132. ci = next((i for i, n in enumerate(names) if _CHANGE_HDR.search(str(n))), -1)
  133. raw_p = _cell_at(names, data_row, pi)
  134. raw_c = _cell_at(names, data_row, ci)
  135. ps = str(raw_p).strip() if raw_p is not None and str(raw_p).strip() else None
  136. cv = _parse_pct_cell(raw_c)
  137. if ps or cv is not None:
  138. return ps, cv
  139. return _heuristic_index_from_row(data_row, names)
  140. return None, None
  141. def _extract_index_card_from_tables(tables: list) -> tuple[Optional[str], Optional[float]]:
  142. """从 mx_data 全部 sheet 解析仪表盘展示字段(妙想常返回多表,首表可能是说明性空表)"""
  143. if not tables:
  144. return None, None
  145. merged_p: Optional[str] = None
  146. merged_c: Optional[float] = None
  147. for t in tables:
  148. if not isinstance(t, dict):
  149. continue
  150. p, c = _extract_index_card_one_table(t)
  151. if p is not None and c is not None:
  152. return p, c
  153. if merged_p is None and p is not None:
  154. merged_p = p
  155. if merged_c is None and c is not None:
  156. merged_c = c
  157. if merged_p is not None and merged_c is not None:
  158. return merged_p, merged_c
  159. return merged_p, merged_c
  160. def _enrich_index_quote_result(result: dict) -> dict:
  161. """成功返回时附加 display_*,供前端直接使用;deepcopy 避免污染进程内缓存对象"""
  162. if not result.get("success"):
  163. return result
  164. out = copy.deepcopy(result)
  165. price, chg = _extract_index_card_from_tables(out.get("tables") or [])
  166. if price is not None:
  167. out["display_price"] = price
  168. if chg is not None:
  169. out["display_change_pct"] = chg
  170. return out
  171. def _mx_result_meta(*, from_cache: bool, quota_exhausted: bool, channel: str) -> dict:
  172. ttl = int(mx_cache_ttl_seconds())
  173. m = {
  174. "from_cache": from_cache,
  175. "quota_exhausted": quota_exhausted,
  176. "cache_ttl_seconds": ttl,
  177. "channel": channel,
  178. }
  179. if quota_exhausted:
  180. m["hint"] = MX_QUOTA_HINT
  181. return m
  182. def _attach_meta(payload: dict, meta: dict) -> dict:
  183. out = copy.deepcopy(payload)
  184. out["_mx_meta"] = meta
  185. return out
  186. def _fetch_mx_data_live(query: str) -> dict:
  187. """直连妙想 mx_data;MX_REPLAY_FIXTURES 时优先读本地原始 JSON(不修额度)"""
  188. import mx_data as _mx
  189. result = {
  190. "success": False,
  191. "query": query,
  192. "tables": [],
  193. "condition_parts": [],
  194. "total_rows": 0,
  195. "error": None,
  196. }
  197. raw_fixture = try_load_raw_fixture("mx_data", query)
  198. if raw_fixture is not None:
  199. try:
  200. tables, condition_parts, total_rows, error = _mx.MXData.parse_result(raw_fixture)
  201. if error:
  202. result["error"] = f"[fixture] {error}"
  203. return result
  204. result["success"] = True
  205. result["tables"] = tables
  206. result["condition_parts"] = condition_parts
  207. result["total_rows"] = total_rows
  208. return result
  209. except Exception as e:
  210. result["error"] = f"[fixture] 解析失败: {e}"
  211. return result
  212. key_ok = bool(settings.MX_APIKEY and settings.MX_APIKEY != "your-mx-apikey-here")
  213. if not key_ok:
  214. result["error"] = "MX_APIKEY 未配置,且无匹配的本地 fixture(设置 MX_REPLAY_FIXTURES=1 并放置 JSON)"
  215. return result
  216. try:
  217. querier = _mx.MXData(api_key=settings.MX_APIKEY)
  218. raw_result = querier.query(query)
  219. tables, condition_parts, total_rows, error = _mx.MXData.parse_result(raw_result)
  220. if error:
  221. result["error"] = error
  222. return result
  223. result["success"] = True
  224. result["tables"] = tables
  225. result["condition_parts"] = condition_parts
  226. result["total_rows"] = total_rows
  227. return result
  228. except Exception as e:
  229. result["error"] = str(e)
  230. return result
  231. def query_financial_data(query: str) -> dict:
  232. """执行金融数据查询并返回结构化结果(带计时缓存与额度降级)
  233. 缓存键为规范化后的自然语言 query:
  234. - 相同查询串在 TTL 内不会重复请求妙想;
  235. - 股票代码 / 财务指标不同会得到不同 query,从而自动区分。
  236. """
  237. result = {
  238. "success": False,
  239. "query": query,
  240. "tables": [],
  241. "condition_parts": [],
  242. "total_rows": 0,
  243. "error": None,
  244. }
  245. key_missing = not settings.MX_APIKEY or settings.MX_APIKEY == "your-mx-apikey-here"
  246. if key_missing and not settings.MX_REPLAY_FIXTURES:
  247. result["error"] = "MX_APIKEY 未配置"
  248. return result
  249. cache = get_mx_timed_cache()
  250. ttl = mx_cache_ttl_seconds()
  251. key = cache.make_key("mx_data", query)
  252. fresh = cache.get_fresh(key, ttl)
  253. if fresh is not None:
  254. return _attach_meta(
  255. fresh,
  256. _mx_result_meta(from_cache=True, quota_exhausted=False, channel="mx_data"),
  257. )
  258. live = _fetch_mx_data_live(query)
  259. if live["success"]:
  260. cache.set(key, live)
  261. return _attach_meta(
  262. live,
  263. _mx_result_meta(from_cache=False, quota_exhausted=False, channel="mx_data"),
  264. )
  265. err = live.get("error") or ""
  266. if is_mx_quota_exhausted(err):
  267. stale = cache.get_stale(key)
  268. if stale:
  269. merged = copy.deepcopy(stale)
  270. merged["success"] = True
  271. merged["query"] = query
  272. return _attach_meta(
  273. merged,
  274. _mx_result_meta(from_cache=True, quota_exhausted=True, channel="mx_data"),
  275. )
  276. live["error"] = quota_exhausted_no_cache_message(err)
  277. return live
  278. return live
  279. def get_stock_quote(code: str) -> dict:
  280. """查询个股实时行情
  281. 一并请求 OHLC/昨收,供前端「当日价位快照」图使用;仅查最新价时五价合一易变成一条平线。
  282. 优先从文件缓存读取,未命中或过期才调用接口。
  283. """
  284. from app.services.stock_file_cache import get_stock_file_cache
  285. fc = get_stock_file_cache()
  286. cached = fc.get(code, "quote")
  287. if cached and cached.get("data"):
  288. cached_data = cached["data"]
  289. if cached_data.get("success"):
  290. return cached_data
  291. extra = "今开 开盘 最高 最低 昨收 昨收盘价"
  292. if code.startswith(("6", "5", "9")):
  293. query = f"{code} 最新价 涨跌幅 涨跌额 {extra} 成交量 成交额 换手率"
  294. else:
  295. query = f"{code} 最新价 涨跌幅 涨跌额 {extra} 成交量 成交额 换手率"
  296. result = query_financial_data(query)
  297. if result.get("success"):
  298. fc.set(code, "quote", result)
  299. return result
  300. def get_stock_financial(code: str, indicators: str = "净利润 营业收入 净资产收益率 每股收益") -> dict:
  301. """查询个股财务指标(文件缓存优先)"""
  302. from app.services.stock_file_cache import get_stock_file_cache
  303. fc = get_stock_file_cache()
  304. cached = fc.get(code, "financial")
  305. if cached and cached.get("data") and cached["data"].get("success"):
  306. return cached["data"]
  307. query = f"{code} {indicators}"
  308. result = query_financial_data(query)
  309. if result.get("success"):
  310. fc.set(code, "financial", result)
  311. return result
  312. def get_stock_profile(code: str) -> dict:
  313. """查询公司概况(文件缓存优先)"""
  314. from app.services.stock_file_cache import get_stock_file_cache
  315. fc = get_stock_file_cache()
  316. cached = fc.get(code, "profile")
  317. if cached and cached.get("data") and cached["data"].get("success"):
  318. return cached["data"]
  319. query = f"{code} 公司简介 主营业务 成立时间 董事长 总股本"
  320. result = query_financial_data(query)
  321. if result.get("success"):
  322. fc.set(code, "profile", result)
  323. return result
  324. def get_stock_holders(code: str) -> dict:
  325. """查询十大股东(文件缓存优先)"""
  326. from app.services.stock_file_cache import get_stock_file_cache
  327. fc = get_stock_file_cache()
  328. cached = fc.get(code, "holders")
  329. if cached and cached.get("data") and cached["data"].get("success"):
  330. return cached["data"]
  331. query = f"{code} 十大股东"
  332. result = query_financial_data(query)
  333. if result.get("success"):
  334. fc.set(code, "holders", result)
  335. return result
  336. def get_index_quote(index_name: str = "沪深300") -> dict:
  337. """查询指数行情(附带 display_price / display_change_pct 供仪表盘稳定展示)"""
  338. # 避免「上证指数指数」重复;自然语言尽量简短明确
  339. query = f"{index_name} 最新点位 涨跌幅"
  340. base = query_financial_data(query)
  341. return _enrich_index_quote_result(base)
  342. def get_sector_quote(sector_name: str) -> dict:
  343. """查询板块行情"""
  344. query = f"{sector_name}板块最新行情"
  345. return query_financial_data(query)