mx_timed_cache.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. """
  2. 妙想类接口计时缓存(进程内)
  3. - 默认 TTL 10 分钟内相同查询直接返回缓存,减少重复调用。
  4. - 额度用尽时可用过期缓存降级(见各 service 封装)。
  5. """
  6. from __future__ import annotations
  7. import copy
  8. import hashlib
  9. import threading
  10. import time
  11. from typing import Any, Optional
  12. from app.config import settings
  13. class MXTimedCache:
  14. """线程安全的查询串 -> 结果快照缓存"""
  15. def __init__(self, max_entries: int = 400):
  16. self._max = max_entries
  17. self._lock = threading.Lock()
  18. # key -> (monotonic_ts, payload)
  19. self._data: dict[str, tuple[float, Any]] = {}
  20. self._order: list[str] = []
  21. @staticmethod
  22. def normalize_query(q: str) -> str:
  23. return " ".join((q or "").split())
  24. def make_key(self, channel: str, query: str) -> str:
  25. n = self.normalize_query(query)
  26. digest = hashlib.sha256(n.encode("utf-8")).hexdigest()[:32]
  27. return f"{channel}:{digest}"
  28. def get_fresh(self, key: str, ttl_seconds: float) -> Optional[Any]:
  29. if ttl_seconds <= 0:
  30. return None
  31. with self._lock:
  32. ent = self._data.get(key)
  33. if not ent:
  34. return None
  35. ts, payload = ent
  36. if time.monotonic() - ts > ttl_seconds:
  37. return None
  38. return copy.deepcopy(payload)
  39. def get_stale(self, key: str) -> Optional[Any]:
  40. """不按 TTL,只要有快照即返回(用于额度耗尽降级)"""
  41. with self._lock:
  42. ent = self._data.get(key)
  43. if not ent:
  44. return None
  45. return copy.deepcopy(ent[1])
  46. def set(self, key: str, payload: Any) -> None:
  47. snap = copy.deepcopy(payload)
  48. with self._lock:
  49. self._data[key] = (time.monotonic(), snap)
  50. if key in self._order:
  51. self._order.remove(key)
  52. self._order.append(key)
  53. while len(self._order) > self._max:
  54. old = self._order.pop(0)
  55. self._data.pop(old, None)
  56. def delete(self, key: str) -> None:
  57. """主动失效(如自选股增删后勿长期使用旧列表快照)"""
  58. with self._lock:
  59. self._data.pop(key, None)
  60. if key in self._order:
  61. self._order.remove(key)
  62. _mx_cache_singleton: Optional[MXTimedCache] = None
  63. def get_mx_timed_cache() -> MXTimedCache:
  64. global _mx_cache_singleton
  65. if _mx_cache_singleton is None:
  66. _mx_cache_singleton = MXTimedCache()
  67. return _mx_cache_singleton
  68. def mx_cache_ttl_seconds() -> float:
  69. return float(getattr(settings, "MX_CACHE_TTL_SECONDS", 600))