stock_file_cache.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. """
  2. 股票数据文件缓存服务 — 每只股票的所有数据存储到本地文件
  3. 每次获取数据时优先从本地文件读取,未命中或过期才调用接口。
  4. 支持 grep 风格的文件内容搜索。
  5. """
  6. from __future__ import annotations
  7. import json
  8. import logging
  9. import os
  10. import subprocess
  11. import threading
  12. from datetime import date, datetime
  13. from pathlib import Path
  14. from typing import Optional, Any
  15. from app.config import settings
  16. logger = logging.getLogger(__name__)
  17. _cache_lock = threading.Lock()
  18. # 缓存根目录
  19. _STOCK_CACHE_ROOT = settings.DATA_DIR / "stock_cache"
  20. # 各数据类型的文件名
  21. _DATA_TYPE_FILES = {
  22. "quote": "quote.json",
  23. "financial": "financial.json",
  24. "profile": "profile.json",
  25. "holders": "holders.json",
  26. "sentiment": "sentiment.json",
  27. "news": "news.json",
  28. }
  29. # 当日缓存有效期(同一只股票同一天只调一次接口)
  30. _TODAY = date.today().isoformat()
  31. class StockFileCache:
  32. """股票数据文件缓存"""
  33. def __init__(self):
  34. _STOCK_CACHE_ROOT.mkdir(parents=True, exist_ok=True)
  35. self._index_file = _STOCK_CACHE_ROOT / "_index.json"
  36. self._index: dict = {}
  37. self._load_index()
  38. # ---- 索引管理 ----
  39. def _load_index(self):
  40. """加载主索引"""
  41. try:
  42. if self._index_file.exists():
  43. self._index = json.loads(self._index_file.read_text(encoding="utf-8"))
  44. logger.debug("文件缓存索引已加载: %d 条", len(self._index))
  45. except Exception:
  46. self._index = {}
  47. def _save_index(self):
  48. """保存主索引"""
  49. try:
  50. self._index_file.write_text(json.dumps(self._index, ensure_ascii=False, indent=2), encoding="utf-8")
  51. except Exception as e:
  52. logger.warning("保存缓存索引失败: %s", e)
  53. def _stock_dir(self, stock_code: str) -> Path:
  54. clean = stock_code.strip().upper()
  55. d = _STOCK_CACHE_ROOT / clean
  56. d.mkdir(parents=True, exist_ok=True)
  57. return d
  58. def _data_file(self, stock_code: str, data_type: str) -> Path:
  59. filename = _DATA_TYPE_FILES.get(data_type, f"{data_type}.json")
  60. return self._stock_dir(stock_code) / filename
  61. def _update_index(self, stock_code: str, data_type: str):
  62. code = stock_code.strip().upper()
  63. if code not in self._index:
  64. self._index[code] = {"data_types": [], "cached_at": datetime.now().isoformat()}
  65. if data_type not in self._index[code]["data_types"]:
  66. self._index[code]["data_types"].append(data_type)
  67. self._index[code]["cached_at"] = datetime.now().isoformat()
  68. # ---- 读写操作 ----
  69. def get(self, stock_code: str, data_type: str, max_age_hours: int = 24) -> Optional[dict]:
  70. """
  71. 从文件缓存读取数据
  72. Args:
  73. stock_code: 股票代码
  74. data_type: 数据类型 (quote/financial/profile/holders/sentiment/news)
  75. max_age_hours: 最大有效时长(小时),超过则视为过期
  76. Returns:
  77. 缓存数据字典,未命中或过期返回 None
  78. """
  79. filepath = self._data_file(stock_code, data_type)
  80. if not filepath.exists():
  81. return None
  82. # 检查文件时效
  83. mtime = datetime.fromtimestamp(filepath.stat().st_mtime)
  84. age_hours = (datetime.now() - mtime).total_seconds() / 3600
  85. file_date = mtime.strftime("%Y-%m-%d")
  86. # 当日数据直接返回(不限小时数)
  87. if file_date == _TODAY:
  88. pass
  89. elif age_hours > max_age_hours:
  90. logger.debug("缓存过期: %s/%s (%.1f小时前)", stock_code, data_type, age_hours)
  91. return None
  92. try:
  93. data = json.loads(filepath.read_text(encoding="utf-8"))
  94. logger.debug("文件缓存命中: %s/%s", stock_code, data_type)
  95. return data
  96. except Exception as e:
  97. logger.warning("读取缓存文件失败 %s: %s", filepath, e)
  98. return None
  99. def set(self, stock_code: str, data_type: str, data: dict) -> bool:
  100. """
  101. 将数据写入文件缓存
  102. Args:
  103. stock_code: 股票代码
  104. data_type: 数据类型
  105. data: 数据字典
  106. Returns:
  107. 是否写入成功
  108. """
  109. filepath = self._data_file(stock_code, data_type)
  110. try:
  111. wrapper = {
  112. "stock_code": stock_code,
  113. "data_type": data_type,
  114. "cached_at": datetime.now().isoformat(),
  115. "cache_date": _TODAY,
  116. "data": data,
  117. }
  118. filepath.write_text(json.dumps(wrapper, ensure_ascii=False, indent=2), encoding="utf-8")
  119. self._update_index(stock_code, data_type)
  120. self._save_index()
  121. logger.debug("文件缓存写入: %s/%s", stock_code, data_type)
  122. return True
  123. except Exception as e:
  124. logger.warning("写入缓存文件失败 %s: %s", filepath, e)
  125. return False
  126. def has(self, stock_code: str, data_type: str) -> bool:
  127. """检查是否存在有效缓存"""
  128. filepath = self._data_file(stock_code, data_type)
  129. if not filepath.exists():
  130. return False
  131. mtime = datetime.fromtimestamp(filepath.stat().st_mtime)
  132. return mtime.strftime("%Y-%m-%d") == _TODAY
  133. # ---- grep 风格搜索 ----
  134. def grep_search(self, keyword: str, data_type: Optional[str] = None) -> list[dict]:
  135. """
  136. 在所有缓存文件中搜索关键词(类似 grep)
  137. Args:
  138. keyword: 搜索关键词
  139. data_type: 限定数据类型,None 为全部
  140. Returns:
  141. 匹配结果列表 [{stock_code, data_type, file_path, line: str, ...}]
  142. """
  143. results = []
  144. keyword_lower = keyword.lower()
  145. # 先查索引快速定位候选股票
  146. candidates = []
  147. for code, info in self._index.items():
  148. if keyword_lower in code.lower():
  149. candidates.append(code)
  150. continue
  151. types = info.get("data_types", [])
  152. if data_type and data_type not in types:
  153. continue
  154. candidates.append(code)
  155. # 对候选股票目录做内容 grep
  156. for code in candidates:
  157. stock_dir = self._stock_dir(code)
  158. if not stock_dir.exists():
  159. continue
  160. for fname in stock_dir.glob("*.json"):
  161. dtype = fname.stem
  162. if data_type and dtype != data_type:
  163. continue
  164. try:
  165. content = fname.read_text(encoding="utf-8")
  166. if keyword_lower in content.lower():
  167. # 提取匹配行
  168. lines = content.split("\n")
  169. matched_lines = [l.strip() for l in lines if keyword_lower in l.lower()]
  170. results.append({
  171. "stock_code": code,
  172. "data_type": dtype,
  173. "file_path": str(fname),
  174. "matched_lines": matched_lines[:10],
  175. "match_count": len(matched_lines),
  176. "cached_at": datetime.fromtimestamp(fname.stat().st_mtime).isoformat(),
  177. })
  178. except Exception:
  179. continue
  180. return results
  181. def get_stock_codes(self) -> list[str]:
  182. """获取所有已缓存的股票代码"""
  183. return list(self._index.keys())
  184. def get_stock_data_types(self, stock_code: str) -> list[str]:
  185. """获取某股票已缓存的数据类型"""
  186. info = self._index.get(stock_code.upper(), {})
  187. return info.get("data_types", [])
  188. def clear_stock_cache(self, stock_code: Optional[str] = None):
  189. """清除缓存"""
  190. if stock_code:
  191. stock_dir = self._stock_dir(stock_code)
  192. for f in stock_dir.glob("*.json"):
  193. try:
  194. f.unlink()
  195. except Exception:
  196. pass
  197. code = stock_code.upper()
  198. self._index.pop(code, None)
  199. self._save_index()
  200. else:
  201. for f in _STOCK_CACHE_ROOT.glob("**/*.json"):
  202. try:
  203. f.unlink()
  204. except Exception:
  205. pass
  206. self._index.clear()
  207. self._save_index()
  208. def get_stats(self) -> dict:
  209. """获取缓存统计"""
  210. total_files = sum(1 for _ in _STOCK_CACHE_ROOT.glob("**/*.json"))
  211. total_size = sum(f.stat().st_size for f in _STOCK_CACHE_ROOT.glob("**/*.json") if f.is_file())
  212. return {
  213. "stock_count": len(self._index),
  214. "total_files": total_files,
  215. "total_size_mb": round(total_size / 1024 / 1024, 2),
  216. "cache_root": str(_STOCK_CACHE_ROOT),
  217. }
  218. _stock_cache_instance: Optional[StockFileCache] = None
  219. def get_stock_file_cache() -> StockFileCache:
  220. """获取 StockFileCache 全局单例"""
  221. global _stock_cache_instance
  222. if _stock_cache_instance is None:
  223. with _cache_lock:
  224. if _stock_cache_instance is None:
  225. _stock_cache_instance = StockFileCache()
  226. return _stock_cache_instance