manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. """记忆管理器 - 记忆核心层的统一管理接口"""
  2. from typing import List, Dict, Any, Optional, Union
  3. from datetime import datetime
  4. import uuid
  5. import logging
  6. from .base import MemoryItem, MemoryConfig
  7. # 存储和检索功能已被各记忆类型内部实现替代
  8. logger = logging.getLogger(__name__)
  9. class MemoryManager:
  10. """记忆管理器 - 统一的记忆操作接口
  11. 负责:
  12. - 记忆生命周期管理
  13. - 记忆优先级和重要性评估
  14. - 记忆遗忘和清理机制
  15. - 多类型记忆的协调管理
  16. """
  17. def __init__(
  18. self,
  19. config: Optional[MemoryConfig] = None,
  20. user_id: str = "default_user",
  21. enable_working: bool = True,
  22. enable_episodic: bool = True,
  23. enable_semantic: bool = True,
  24. enable_perceptual: bool = False
  25. ):
  26. self.config = config or MemoryConfig()
  27. self.user_id = user_id
  28. # 存储和检索功能已移至各记忆类型内部实现
  29. # 初始化各类型记忆
  30. self.memory_types = {}
  31. if enable_working:
  32. from .types.working import WorkingMemory
  33. self.memory_types['working'] = WorkingMemory(self.config)
  34. if enable_episodic:
  35. from .types.episodic import EpisodicMemory
  36. self.memory_types['episodic'] = EpisodicMemory(self.config)
  37. if enable_semantic:
  38. from .types.semantic import SemanticMemory
  39. self.memory_types['semantic'] = SemanticMemory(self.config)
  40. if enable_perceptual:
  41. from .types.perceptual import PerceptualMemory
  42. self.memory_types['perceptual'] = PerceptualMemory(self.config)
  43. logger.info(f"MemoryManager初始化完成,启用记忆类型: {list(self.memory_types.keys())}")
  44. def add_memory(
  45. self,
  46. content: str,
  47. memory_type: str = "working",
  48. importance: Optional[float] = None,
  49. metadata: Optional[Dict[str, Any]] = None,
  50. auto_classify: bool = True
  51. ) -> str:
  52. """添加记忆
  53. Args:
  54. content: 记忆内容
  55. memory_type: 记忆类型
  56. importance: 重要性分数 (0-1)
  57. metadata: 元数据
  58. auto_classify: 是否自动分类到合适的记忆类型
  59. Returns:
  60. 记忆ID
  61. """
  62. # 自动分类记忆类型
  63. if auto_classify:
  64. memory_type = self._classify_memory_type(content, metadata)
  65. # 计算重要性
  66. if importance is None:
  67. importance = self._calculate_importance(content, metadata)
  68. # 创建记忆项
  69. memory_item = MemoryItem(
  70. id=str(uuid.uuid4()),
  71. content=content,
  72. memory_type=memory_type,
  73. user_id=self.user_id,
  74. timestamp=datetime.now(),
  75. importance=importance,
  76. metadata=metadata or {}
  77. )
  78. # 添加到对应的记忆类型
  79. if memory_type in self.memory_types:
  80. memory_id = self.memory_types[memory_type].add(memory_item)
  81. logger.debug(f"添加记忆到 {memory_type}: {memory_id}")
  82. return memory_id
  83. else:
  84. raise ValueError(f"不支持的记忆类型: {memory_type}")
  85. def retrieve_memories(
  86. self,
  87. query: str,
  88. memory_types: Optional[List[str]] = None,
  89. limit: int = 10,
  90. min_importance: float = 0.0,
  91. time_range: Optional[tuple] = None
  92. ) -> List[MemoryItem]:
  93. """检索记忆
  94. Args:
  95. query: 查询内容
  96. memory_types: 要检索的记忆类型列表
  97. limit: 返回数量限制
  98. min_importance: 最小重要性阈值
  99. time_range: 时间范围 (start_time, end_time)
  100. Returns:
  101. 检索到的记忆列表
  102. """
  103. if memory_types is None:
  104. memory_types = list(self.memory_types.keys())
  105. # 从各个记忆类型中检索
  106. all_results = []
  107. per_type_limit = max(1, limit // len(memory_types))
  108. for memory_type in memory_types:
  109. if memory_type in self.memory_types:
  110. memory_instance = self.memory_types[memory_type]
  111. try:
  112. # 使用各个记忆类型自己的检索方法
  113. type_results = memory_instance.retrieve(
  114. query=query,
  115. limit=per_type_limit,
  116. min_importance=min_importance,
  117. user_id=self.user_id
  118. )
  119. all_results.extend(type_results)
  120. except Exception as e:
  121. logger.warning(f"检索 {memory_type} 记忆时出错: {e}")
  122. continue
  123. # 按重要性和相关性排序
  124. all_results.sort(key=lambda x: x.importance, reverse=True)
  125. return all_results[:limit]
  126. def update_memory(
  127. self,
  128. memory_id: str,
  129. content: Optional[str] = None,
  130. importance: Optional[float] = None,
  131. metadata: Optional[Dict[str, Any]] = None
  132. ) -> bool:
  133. """更新记忆
  134. Args:
  135. memory_id: 记忆ID
  136. content: 新内容
  137. importance: 新重要性
  138. metadata: 新元数据
  139. Returns:
  140. 是否更新成功
  141. """
  142. # 查找记忆所在的类型
  143. for memory_type, memory_instance in self.memory_types.items():
  144. if memory_instance.has_memory(memory_id):
  145. return memory_instance.update(memory_id, content, importance, metadata)
  146. logger.warning(f"未找到记忆: {memory_id}")
  147. return False
  148. def remove_memory(self, memory_id: str) -> bool:
  149. """删除记忆
  150. Args:
  151. memory_id: 记忆ID
  152. Returns:
  153. 是否删除成功
  154. """
  155. for memory_type, memory_instance in self.memory_types.items():
  156. if memory_instance.has_memory(memory_id):
  157. return memory_instance.remove(memory_id)
  158. logger.warning(f"未找到记忆: {memory_id}")
  159. return False
  160. def forget_memories(
  161. self,
  162. strategy: str = "importance_based",
  163. threshold: float = 0.1,
  164. max_age_days: int = 30
  165. ) -> int:
  166. """记忆遗忘机制
  167. Args:
  168. strategy: 遗忘策略 ("importance_based", "time_based", "capacity_based")
  169. threshold: 遗忘阈值
  170. max_age_days: 最大保存天数
  171. Returns:
  172. 遗忘的记忆数量
  173. """
  174. total_forgotten = 0
  175. for memory_type, memory_instance in self.memory_types.items():
  176. if hasattr(memory_instance, 'forget'):
  177. forgotten = memory_instance.forget(strategy, threshold, max_age_days)
  178. total_forgotten += forgotten
  179. logger.info(f"记忆遗忘完成: {total_forgotten} 条记忆")
  180. return total_forgotten
  181. def consolidate_memories(
  182. self,
  183. from_type: str = "working",
  184. to_type: str = "episodic",
  185. importance_threshold: float = 0.7
  186. ) -> int:
  187. """记忆整合 - 将重要的短期记忆转换为长期记忆
  188. Args:
  189. from_type: 源记忆类型
  190. to_type: 目标记忆类型
  191. importance_threshold: 重要性阈值
  192. Returns:
  193. 整合的记忆数量
  194. """
  195. if from_type not in self.memory_types or to_type not in self.memory_types:
  196. logger.warning(f"记忆类型不存在: {from_type} -> {to_type}")
  197. return 0
  198. # 获取高重要性的源记忆
  199. source_memory = self.memory_types[from_type]
  200. target_memory = self.memory_types[to_type]
  201. # 获取需要整合的记忆
  202. all_memories = source_memory.get_all()
  203. candidates = [
  204. m for m in all_memories
  205. if m.importance >= importance_threshold
  206. ]
  207. consolidated_count = 0
  208. for memory in candidates:
  209. # 移动到目标记忆类型
  210. if source_memory.remove(memory.id):
  211. memory.memory_type = to_type
  212. memory.importance *= 1.1 # 提升重要性
  213. target_memory.add(memory)
  214. consolidated_count += 1
  215. logger.info(f"记忆整合完成: {consolidated_count} 条记忆从 {from_type} 转移到 {to_type}")
  216. return consolidated_count
  217. def get_memory_stats(self) -> Dict[str, Any]:
  218. """获取记忆统计信息"""
  219. stats = {
  220. "user_id": self.user_id,
  221. "enabled_types": list(self.memory_types.keys()),
  222. "total_memories": 0,
  223. "memories_by_type": {},
  224. "config": {
  225. "max_capacity": self.config.max_capacity,
  226. "importance_threshold": self.config.importance_threshold,
  227. "decay_factor": self.config.decay_factor
  228. }
  229. }
  230. for memory_type, memory_instance in self.memory_types.items():
  231. type_stats = memory_instance.get_stats()
  232. stats["memories_by_type"][memory_type] = type_stats
  233. # 使用count字段(活跃记忆数),而不是total_count(包含已遗忘的)
  234. stats["total_memories"] += type_stats.get("count", 0)
  235. return stats
  236. def clear_all_memories(self):
  237. """清空所有记忆"""
  238. for memory_type, memory_instance in self.memory_types.items():
  239. memory_instance.clear()
  240. logger.info("所有记忆已清空")
  241. def _classify_memory_type(self, content: str, metadata: Optional[Dict[str, Any]]) -> str:
  242. """自动分类记忆类型"""
  243. if metadata and metadata.get("type"):
  244. return metadata["type"]
  245. # 简单的分类逻辑,可以扩展为更复杂的分类器
  246. if self._is_episodic_content(content):
  247. return "episodic"
  248. elif self._is_semantic_content(content):
  249. return "semantic"
  250. else:
  251. return "working"
  252. def _is_episodic_content(self, content: str) -> bool:
  253. """判断是否为情景记忆内容"""
  254. episodic_keywords = ["昨天", "今天", "明天", "上次", "记得", "发生", "经历"]
  255. return any(keyword in content for keyword in episodic_keywords)
  256. def _is_semantic_content(self, content: str) -> bool:
  257. """判断是否为语义记忆内容"""
  258. semantic_keywords = ["定义", "概念", "规则", "知识", "原理", "方法"]
  259. return any(keyword in content for keyword in semantic_keywords)
  260. def _calculate_importance(self, content: str, metadata: Optional[Dict[str, Any]]) -> float:
  261. """计算记忆重要性"""
  262. importance = 0.5 # 基础重要性
  263. # 基于内容长度
  264. if len(content) > 100:
  265. importance += 0.1
  266. # 基于关键词
  267. important_keywords = ["重要", "关键", "必须", "注意", "警告", "错误"]
  268. if any(keyword in content for keyword in important_keywords):
  269. importance += 0.2
  270. # 基于元数据
  271. if metadata:
  272. if metadata.get("priority") == "high":
  273. importance += 0.3
  274. elif metadata.get("priority") == "low":
  275. importance -= 0.2
  276. return max(0.0, min(1.0, importance))
  277. def __str__(self) -> str:
  278. stats = self.get_memory_stats()
  279. return f"MemoryManager(user={self.user_id}, total={stats['total_memories']})"