screener_service.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. """
  2. 智能股票分析助手 — 智能选股服务层
  3. 封装智能选股查询、条件解析和数据格式化逻辑。
  4. 含 mx-xuangu 计时缓存与额度用尽时的缓存降级。
  5. """
  6. from __future__ import annotations
  7. import copy
  8. import sys
  9. from pathlib import Path
  10. _PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
  11. _AGENTS_DIR = _PROJECT_ROOT / "agents"
  12. _SKILLS_XUANGU = _PROJECT_ROOT / "skills" / "智能选股" / "mx-xuangu"
  13. for p in [_AGENTS_DIR, _SKILLS_XUANGU, str(_PROJECT_ROOT)]:
  14. if str(p) not in sys.path:
  15. sys.path.insert(0, str(p))
  16. from app.config import settings
  17. from app.services.mx_timed_cache import get_mx_timed_cache, mx_cache_ttl_seconds
  18. from app.utils.mx_fixture import try_load_raw_fixture
  19. from app.utils.mx_quota import MX_QUOTA_HINT, is_mx_quota_exhausted, quota_exhausted_no_cache_message
  20. def _meta_block(*, from_cache: bool, quota_exhausted: bool, channel: str) -> dict:
  21. m = {
  22. "from_cache": from_cache,
  23. "quota_exhausted": quota_exhausted,
  24. "cache_ttl_seconds": int(mx_cache_ttl_seconds()),
  25. "channel": channel,
  26. }
  27. if quota_exhausted:
  28. m["hint"] = MX_QUOTA_HINT
  29. return m
  30. def _attach(payload: dict, meta: dict) -> dict:
  31. out = copy.deepcopy(payload)
  32. out["_mx_meta"] = meta
  33. return out
  34. def _fetch_screen_live(query: str) -> dict:
  35. import mx_xuangu as _mx
  36. result = {
  37. "success": False,
  38. "query": query,
  39. "total_count": 0,
  40. "data_source": "",
  41. "stocks": [],
  42. "conditions": [],
  43. "error": None,
  44. }
  45. raw_fixture = try_load_raw_fixture("mx_xuangu", query)
  46. raw_result = raw_fixture
  47. try:
  48. if raw_result is None:
  49. key_ok = bool(settings.MX_APIKEY and settings.MX_APIKEY != "your-mx-apikey-here")
  50. if not key_ok:
  51. result["error"] = "MX_APIKEY 未配置,且无匹配的本地 fixture"
  52. return result
  53. screener = _mx.MXSelectStock(api_key=settings.MX_APIKEY)
  54. raw_result = screener.search(query)
  55. rows, data_source, error = _mx.MXSelectStock.extract_data(raw_result)
  56. if error:
  57. result["error"] = (f"[fixture] {error}" if raw_fixture is not None else error)
  58. return result
  59. data = raw_result.get("data", {})
  60. inner_data = data.get("data", {})
  61. response_conditions = inner_data.get("responseConditionList", []) or []
  62. conditions = []
  63. for cond in response_conditions:
  64. if isinstance(cond, dict):
  65. conditions.append({
  66. "describe": cond.get("describe", ""),
  67. "stock_count": cond.get("stockCount", 0),
  68. })
  69. result["success"] = True
  70. result["total_count"] = len(rows)
  71. result["data_source"] = data_source
  72. result["stocks"] = rows
  73. result["conditions"] = conditions
  74. return result
  75. except Exception as e:
  76. result["error"] = (f"[fixture] {e}" if raw_fixture is not None else str(e))
  77. return result
  78. def screen_stocks(query: str) -> dict:
  79. """执行智能选股(选股条件字符串不同则缓存键不同)"""
  80. result = {
  81. "success": False,
  82. "query": query,
  83. "total_count": 0,
  84. "data_source": "",
  85. "stocks": [],
  86. "conditions": [],
  87. "error": None,
  88. }
  89. key_missing = not settings.MX_APIKEY or settings.MX_APIKEY == "your-mx-apikey-here"
  90. if key_missing and not settings.MX_REPLAY_FIXTURES:
  91. result["error"] = "MX_APIKEY 未配置"
  92. return result
  93. cache = get_mx_timed_cache()
  94. ttl = mx_cache_ttl_seconds()
  95. key = cache.make_key("mx_xuangu", query)
  96. fresh = cache.get_fresh(key, ttl)
  97. if fresh is not None:
  98. return _attach(fresh, _meta_block(from_cache=True, quota_exhausted=False, channel="mx_xuangu"))
  99. live = _fetch_screen_live(query)
  100. if live["success"]:
  101. cache.set(key, live)
  102. return _attach(live, _meta_block(from_cache=False, quota_exhausted=False, channel="mx_xuangu"))
  103. err = live.get("error") or ""
  104. if is_mx_quota_exhausted(err):
  105. stale = cache.get_stale(key)
  106. if stale:
  107. merged = copy.deepcopy(stale)
  108. merged["success"] = True
  109. merged["query"] = query
  110. return _attach(merged, _meta_block(from_cache=True, quota_exhausted=True, channel="mx_xuangu"))
  111. live["error"] = quota_exhausted_no_cache_message(err)
  112. return live
  113. return live
  114. def get_available_conditions() -> dict:
  115. """获取常用的选股条件参考(静态说明,不调用妙想)"""
  116. return {
  117. "success": True,
  118. "categories": [
  119. {
  120. "name": "行情指标",
  121. "description": "基于实时行情数据的筛选条件",
  122. "examples": [
  123. "今日涨幅大于2%",
  124. "成交量大于10亿",
  125. "股价在10元到20元之间",
  126. "换手率大于5%",
  127. ],
  128. },
  129. {
  130. "name": "财务指标",
  131. "description": "基于财务报表数据的筛选条件",
  132. "examples": [
  133. "市盈率小于20",
  134. "市净率小于2",
  135. "ROE大于15%",
  136. "净利润增长率大于20%",
  137. "股息率大于3%",
  138. ],
  139. },
  140. {
  141. "name": "行业板块",
  142. "description": "限定行业或板块范围的筛选",
  143. "examples": [
  144. "新能源板块",
  145. "白酒板块",
  146. "半导体行业",
  147. "银行股",
  148. ],
  149. },
  150. {
  151. "name": "指数成分",
  152. "description": "指定指数成分股内筛选",
  153. "examples": [
  154. "沪深300成分股",
  155. "创业板成分股",
  156. "上证50成分股",
  157. ],
  158. },
  159. ],
  160. }