watchlist_service.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. """
  2. 智能股票分析助手 — 自选股管理服务层
  3. 封装自选股查询、添加、删除的数据查询逻辑,供API路由层调用。
  4. """
  5. import sys
  6. from pathlib import Path
  7. # 确保skills路径可导入
  8. _PROJECT_ROOT = Path(__file__).parent.parent.parent.parent # backend/app/services -> project root
  9. _AGENTS_DIR = _PROJECT_ROOT / "agents"
  10. _SKILLS_ZIXUAN = _PROJECT_ROOT / "skills" / "自选股管理" / "mx-zixuan"
  11. for p in [_AGENTS_DIR, _SKILLS_ZIXUAN, str(_PROJECT_ROOT)]:
  12. if str(p) not in sys.path:
  13. sys.path.insert(0, str(p))
  14. from app.config import settings
  15. from app.services.mx_timed_cache import get_mx_timed_cache, mx_cache_ttl_seconds
  16. # 与 mx_data / mx_search 共用 TTL(默认 600s):逾时才再打妙想侧自选接口
  17. _WATCHLIST_CACHE_QUERY = "mx_zixuan_self_select_list"
  18. def _watchlist_cache_key() -> str:
  19. return get_mx_timed_cache().make_key("mx_zixuan", _WATCHLIST_CACHE_QUERY)
  20. def _invalidate_watchlist_cache() -> None:
  21. get_mx_timed_cache().delete(_watchlist_cache_key())
  22. def _get_mx_zixuan_instance():
  23. """创建mx_zixuan API调用实例"""
  24. import mx_zixuan as _mx
  25. return _mx
  26. def get_watchlist() -> dict:
  27. """查询自选股列表
  28. Returns:
  29. {
  30. "success": True/False,
  31. "stocks": [{"code": str, "name": str, "price": float, ...}, ...],
  32. "total": int,
  33. "error": str or None
  34. }
  35. """
  36. import mx_zixuan as _mx
  37. ttl = mx_cache_ttl_seconds()
  38. if ttl > 0:
  39. cached = get_mx_timed_cache().get_fresh(_watchlist_cache_key(), ttl)
  40. if cached is not None:
  41. return cached
  42. result = {
  43. "success": False,
  44. "stocks": [],
  45. "total": 0,
  46. "error": None,
  47. }
  48. if not settings.MX_APIKEY or settings.MX_APIKEY == "your-mx-apikey-here":
  49. result["error"] = "MX_APIKEY 未配置"
  50. return result
  51. try:
  52. raw_result = _mx.query_self_select(settings.MX_APIKEY)
  53. # 检查API状态
  54. status = raw_result.get("status", -1)
  55. code = raw_result.get("code", -1)
  56. if status != 0 and code != 0:
  57. result["error"] = raw_result.get("message", "查询自选股失败")
  58. return result
  59. # 解析查询结果
  60. data = raw_result.get("data", {})
  61. all_results = data.get("allResults", {})
  62. result_data = all_results.get("result", {})
  63. data_list = result_data.get("dataList", [])
  64. stocks = []
  65. for stock in (data_list or []):
  66. stocks.append({
  67. "code": stock.get("SECURITY_CODE", ""),
  68. "name": stock.get("SECURITY_SHORT_NAME", ""),
  69. "price": stock.get("NEWEST_PRICE", ""),
  70. "change_pct": stock.get("CHG", ""),
  71. "change_amount": stock.get("PCHG", ""),
  72. "turnover_rate": stock.get("010000_TURNOVER_RATE", ""),
  73. "volume_ratio": stock.get("010000_LIANGBI", ""),
  74. })
  75. result["success"] = True
  76. result["stocks"] = stocks
  77. result["total"] = len(stocks)
  78. if ttl > 0:
  79. get_mx_timed_cache().set(_watchlist_cache_key(), result)
  80. return result
  81. except Exception as e:
  82. result["error"] = str(e)
  83. return result
  84. def add_to_watchlist(stock_input: str) -> dict:
  85. """添加股票到自选股
  86. Args:
  87. stock_input: 股票名称或代码,如 "贵州茅台" 或 "600519"
  88. Returns:
  89. {
  90. "success": True/False,
  91. "message": str,
  92. "error": str or None
  93. }
  94. """
  95. import mx_zixuan as _mx
  96. result = {
  97. "success": False,
  98. "message": "",
  99. "error": None,
  100. }
  101. if not settings.MX_APIKEY or settings.MX_APIKEY == "your-mx-apikey-here":
  102. result["error"] = "MX_APIKEY 未配置"
  103. return result
  104. try:
  105. # 构造自然语言添加指令
  106. query = f"把{stock_input}添加到我的自选股列表"
  107. raw_result = _mx.manage_self_select(settings.MX_APIKEY, query)
  108. status = raw_result.get("status", -1)
  109. code = raw_result.get("code", -1)
  110. if status != 0 and code != 0:
  111. result["error"] = raw_result.get("message", "添加自选股失败")
  112. return result
  113. result["success"] = True
  114. result["message"] = raw_result.get("message", f"已将 {stock_input} 加入自选")
  115. _invalidate_watchlist_cache()
  116. return result
  117. except Exception as e:
  118. result["error"] = str(e)
  119. return result
  120. def delete_from_watchlist(stock_input: str) -> dict:
  121. """从自选股中删除股票
  122. Args:
  123. stock_input: 股票名称或代码,如 "贵州茅台" 或 "600519"
  124. Returns:
  125. {
  126. "success": True/False,
  127. "message": str,
  128. "error": str or None
  129. }
  130. """
  131. import mx_zixuan as _mx
  132. result = {
  133. "success": False,
  134. "message": "",
  135. "error": None,
  136. }
  137. if not settings.MX_APIKEY or settings.MX_APIKEY == "your-mx-apikey-here":
  138. result["error"] = "MX_APIKEY 未配置"
  139. return result
  140. try:
  141. # 构造自然语言删除指令
  142. query = f"把{stock_input}从我的自选股列表删除"
  143. raw_result = _mx.manage_self_select(settings.MX_APIKEY, query)
  144. status = raw_result.get("status", -1)
  145. code = raw_result.get("code", -1)
  146. if status != 0 and code != 0:
  147. result["error"] = raw_result.get("message", "删除自选股失败")
  148. return result
  149. result["success"] = True
  150. result["message"] = raw_result.get("message", f"已将 {stock_input} 从自选中移除")
  151. _invalidate_watchlist_cache()
  152. return result
  153. except Exception as e:
  154. result["error"] = str(e)
  155. return result