builder.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. """ContextBuilder - GSSC流水线实现
  2. 实现 Gather-Select-Structure-Compress 上下文构建流程:
  3. 1. Gather: 从多源收集候选信息(历史、记忆、RAG、工具结果)
  4. 2. Select: 基于优先级、相关性、多样性筛选
  5. 3. Structure: 组织成结构化上下文模板
  6. 4. Compress: 在预算内压缩与规范化
  7. """
  8. from typing import Dict, Any, List, Optional, Tuple, TYPE_CHECKING, Any as TypingAny
  9. from dataclasses import dataclass, field
  10. from datetime import datetime
  11. import tiktoken
  12. import math
  13. from core.message import Message
  14. from core.llm import HelloAgentsLLM
  15. if TYPE_CHECKING:
  16. # Optional, only for type checking. Importing tools at runtime may pull in heavy optional deps.
  17. from tools import MemoryTool, RAGTool
  18. else:
  19. MemoryTool = TypingAny # type: ignore[assignment,misc]
  20. RAGTool = TypingAny # type: ignore[assignment,misc]
  21. @dataclass
  22. class ContextPacket:
  23. """上下文信息包"""
  24. content: str
  25. timestamp: datetime = field(default_factory=datetime.now)
  26. metadata: Dict[str, Any] = field(default_factory=dict)
  27. token_count: int = 0
  28. relevance_score: float = 0.0 # 0.0-1.0
  29. def __post_init__(self):
  30. """自动计算token数"""
  31. if self.token_count == 0:
  32. self.token_count = count_tokens(self.content)
  33. @dataclass
  34. class ContextConfig:
  35. """上下文构建配置"""
  36. max_tokens: int = 8000 # 总预算
  37. reserve_ratio: float = 0.15 # 生成余量(10-20%)
  38. min_relevance: float = 0.3 # 最小相关性阈值(仅对扩展上下文生效)
  39. max_history_turns: int = 10 # 最大保留对话轮数
  40. enable_mmr: bool = True # 启用最大边际相关性(多样性)
  41. mmr_lambda: float = 0.7 # MMR平衡参数(0=纯多样性, 1=纯相关性)
  42. system_prompt_template: str = "" # 系统提示模板
  43. enable_compression: bool = True # 启用压缩
  44. include_output_format: bool = True # 是否附加固定输出格式约束
  45. # 按需探索模式:不主动查询 memory/rag,由模型通过工具按需获取
  46. lazy_fetch: bool = True
  47. def get_available_tokens(self) -> int:
  48. """获取可用token预算(扣除余量)"""
  49. return int(self.max_tokens * (1 - self.reserve_ratio))
  50. class ContextBuilder:
  51. """上下文构建器 - GSSC流水线
  52. 设计理念(借鉴 Claude Code):
  53. - 保底上下文:系统提示 + 对话历史 + 上次工具摘要(自动注入,不可或缺)
  54. - 扩展上下文:memory/rag/notes(通过工具按需获取,模型自行决定)
  55. 用法示例:
  56. ```python
  57. # 方式1:只构建保底上下文(推荐,让模型按需获取扩展上下文)
  58. builder = ContextBuilder(config=ContextConfig(lazy_fetch=True))
  59. context = builder.build_base(
  60. user_query="用户问题",
  61. conversation_history=[...],
  62. system_instructions="系统指令",
  63. tool_summaries=[...] # 上次工具调用摘要
  64. )
  65. # 方式2:传统模式,主动收集所有上下文
  66. builder = ContextBuilder(
  67. memory_tool=memory_tool,
  68. rag_tool=rag_tool,
  69. config=ContextConfig(lazy_fetch=False)
  70. )
  71. context = builder.build(user_query="用户问题", ...)
  72. ```
  73. """
  74. def __init__(
  75. self,
  76. memory_tool: Optional[MemoryTool] = None,
  77. rag_tool: Optional[RAGTool] = None,
  78. config: Optional[ContextConfig] = None,
  79. llm: Optional[HelloAgentsLLM] = None,
  80. ):
  81. self.memory_tool = memory_tool
  82. self.rag_tool = rag_tool
  83. self.config = config or ContextConfig()
  84. self.llm = llm
  85. self._encoding = tiktoken.get_encoding("cl100k_base")
  86. def build_base(
  87. self,
  88. user_query: str,
  89. conversation_history: Optional[List[Message]] = None,
  90. system_instructions: Optional[str] = None,
  91. tool_summaries: Optional[List[str]] = None,
  92. pending_state: Optional[str] = None,
  93. ) -> str:
  94. """构建保底上下文(推荐使用)
  95. 只包含必需的基础上下文,不主动查询 memory/rag。
  96. 扩展上下文由模型通过 context_fetch 工具按需获取。
  97. 保底上下文包括:
  98. - 系统指令/安全约束
  99. - 最近 N 轮对话历史
  100. - 上次工具调用摘要
  101. - 待确认的补丁/计划状态
  102. Args:
  103. user_query: 用户查询
  104. conversation_history: 对话历史
  105. system_instructions: 系统指令
  106. tool_summaries: 上次工具调用摘要列表
  107. pending_state: 待确认的状态(补丁/计划等)
  108. Returns:
  109. 结构化上下文字符串
  110. """
  111. packets = []
  112. # P0: 系统指令(必须)
  113. if system_instructions:
  114. packets.append(ContextPacket(
  115. content=system_instructions,
  116. metadata={"type": "instructions"}
  117. ))
  118. # P1: 对话历史(必须)
  119. if conversation_history:
  120. recent_history = conversation_history[-self.config.max_history_turns:]
  121. history_text = "\n".join([
  122. f"[{msg.role}] {msg.content}"
  123. for msg in recent_history
  124. ])
  125. packets.append(ContextPacket(
  126. content=history_text,
  127. metadata={"type": "history", "count": len(recent_history)}
  128. ))
  129. # P2: 上次工具调用摘要(如有)
  130. if tool_summaries:
  131. summary_text = "\n".join(tool_summaries[-3:]) # 最多保留最近 3 条
  132. packets.append(ContextPacket(
  133. content=f"[上次工具结果摘要]\n{summary_text}",
  134. metadata={"type": "tool_summary"}
  135. ))
  136. # P3: 待确认状态(如有)
  137. if pending_state:
  138. packets.append(ContextPacket(
  139. content=f"[待确认状态]\n{pending_state}",
  140. metadata={"type": "pending_state"}
  141. ))
  142. # 直接结构化,不做相关性过滤(保底上下文全部保留)
  143. structured_context = self._structure_base(packets, user_query)
  144. # 压缩(如果超预算)
  145. return self._compress(structured_context)
  146. def _structure_base(
  147. self,
  148. packets: List[ContextPacket],
  149. user_query: str,
  150. ) -> str:
  151. """为保底上下文构建结构化模板"""
  152. sections = []
  153. # [Role & Policies]
  154. instructions = [p for p in packets if p.metadata.get("type") == "instructions"]
  155. if instructions:
  156. sections.append("[Role & Policies]\n" + "\n".join([p.content for p in instructions]))
  157. # [Context] - 对话历史
  158. history = [p for p in packets if p.metadata.get("type") == "history"]
  159. if history:
  160. sections.append("[Context]\n以下是最近的对话记录:\n" + "\n".join([p.content for p in history]))
  161. # [Evidence] - 工具摘要
  162. tool_summary = [p for p in packets if p.metadata.get("type") == "tool_summary"]
  163. if tool_summary:
  164. sections.append("[Evidence]\n" + "\n".join([p.content for p in tool_summary]))
  165. # [State] - 待确认状态
  166. pending = [p for p in packets if p.metadata.get("type") == "pending_state"]
  167. if pending:
  168. sections.append("[State]\n" + "\n".join([p.content for p in pending]))
  169. # [Task]
  170. sections.append(f"[Task]\n{user_query}")
  171. return "\n\n".join(sections)
  172. def build(
  173. self,
  174. user_query: str,
  175. conversation_history: Optional[List[Message]] = None,
  176. system_instructions: Optional[str] = None,
  177. additional_packets: Optional[List[ContextPacket]] = None
  178. ) -> str:
  179. """构建完整上下文
  180. Args:
  181. user_query: 用户查询
  182. conversation_history: 对话历史
  183. system_instructions: 系统指令
  184. additional_packets: 额外的上下文包
  185. Returns:
  186. 结构化上下文字符串
  187. """
  188. # 1. Gather: 收集候选信息
  189. packets = self._gather(
  190. user_query=user_query,
  191. conversation_history=conversation_history or [],
  192. system_instructions=system_instructions,
  193. additional_packets=additional_packets or []
  194. )
  195. # 2. Select: 筛选与排序
  196. selected_packets = self._select(packets, user_query)
  197. # 3. Structure: 组织成结构化模板
  198. structured_context = self._structure(
  199. selected_packets=selected_packets,
  200. user_query=user_query,
  201. system_instructions=system_instructions
  202. )
  203. # 4. Compress: 压缩与规范化(如果超预算)
  204. final_context = self._compress(structured_context)
  205. return final_context
  206. def _gather(
  207. self,
  208. user_query: str,
  209. conversation_history: List[Message],
  210. system_instructions: Optional[str],
  211. additional_packets: List[ContextPacket]
  212. ) -> List[ContextPacket]:
  213. """Gather: 收集候选信息
  214. 当 lazy_fetch=True 时,只收集保底上下文(系统指令+对话历史+额外包)。
  215. 当 lazy_fetch=False 时,主动查询 memory/rag(传统模式)。
  216. """
  217. packets = []
  218. # P0: 系统指令(强约束,总是保留)
  219. if system_instructions:
  220. packets.append(ContextPacket(
  221. content=system_instructions,
  222. metadata={"type": "instructions"}
  223. ))
  224. # P1: 对话历史(基础上下文,总是保留)
  225. if conversation_history:
  226. recent_history = conversation_history[-self.config.max_history_turns:]
  227. history_text = "\n".join([
  228. f"[{msg.role}] {msg.content}"
  229. for msg in recent_history
  230. ])
  231. packets.append(ContextPacket(
  232. content=history_text,
  233. metadata={"type": "history", "count": len(recent_history)}
  234. ))
  235. # 以下为扩展上下文,仅在 lazy_fetch=False 时主动收集
  236. if not self.config.lazy_fetch:
  237. # P2: 从记忆中获取任务状态与关键结论
  238. if self.memory_tool:
  239. try:
  240. # 搜索任务状态相关记忆
  241. state_results = self.memory_tool.execute(
  242. "search",
  243. query="(任务状态 OR 子目标 OR 结论 OR 阻塞)",
  244. min_importance=0.7,
  245. limit=5
  246. )
  247. if state_results and "未找到" not in state_results:
  248. packets.append(ContextPacket(
  249. content=state_results,
  250. metadata={"type": "task_state", "importance": "high"}
  251. ))
  252. # 搜索与当前查询相关的记忆
  253. related_results = self.memory_tool.execute(
  254. "search",
  255. query=user_query,
  256. limit=5
  257. )
  258. if related_results and "未找到" not in related_results:
  259. packets.append(ContextPacket(
  260. content=related_results,
  261. metadata={"type": "related_memory"}
  262. ))
  263. except Exception as e:
  264. print(f"⚠️ 记忆检索失败: {e}")
  265. # P3: 从RAG中获取事实证据
  266. if self.rag_tool:
  267. try:
  268. rag_results = self.rag_tool.run({
  269. "action": "search",
  270. "query": user_query,
  271. "top_k": 5
  272. })
  273. if rag_results and "未找到" not in rag_results and "错误" not in rag_results:
  274. packets.append(ContextPacket(
  275. content=rag_results,
  276. metadata={"type": "knowledge_base"}
  277. ))
  278. except Exception as e:
  279. print(f"⚠️ RAG检索失败: {e}")
  280. # 添加额外包(如上次工具结果摘要)
  281. packets.extend(additional_packets)
  282. return packets
  283. def _select(
  284. self,
  285. packets: List[ContextPacket],
  286. user_query: str
  287. ) -> List[ContextPacket]:
  288. """Select: 基于分数与预算的筛选"""
  289. # 1) 计算相关性(关键词重叠)
  290. query_tokens = set(user_query.lower().split())
  291. for packet in packets:
  292. content_tokens = set(packet.content.lower().split())
  293. if len(query_tokens) > 0:
  294. overlap = len(query_tokens & content_tokens)
  295. packet.relevance_score = overlap / len(query_tokens)
  296. else:
  297. packet.relevance_score = 0.0
  298. # 2) 计算新近性(指数衰减)
  299. def recency_score(ts: datetime) -> float:
  300. delta = max((datetime.now() - ts).total_seconds(), 0)
  301. tau = 3600 # 1小时时间尺度,可暴露到配置
  302. return math.exp(-delta / tau)
  303. # 3) 计算复合分:0.7*相关性 + 0.3*新近性
  304. scored_packets: List[Tuple[float, ContextPacket]] = []
  305. for p in packets:
  306. rec = recency_score(p.timestamp)
  307. score = 0.7 * p.relevance_score + 0.3 * rec
  308. scored_packets.append((score, p))
  309. # 4) 系统指令和对话历史单独拿出,固定纳入(这两者是基础上下文,不应被过滤)
  310. must_keep_types = {"instructions", "history"}
  311. must_keep_packets = [p for (_, p) in scored_packets if p.metadata.get("type") in must_keep_types]
  312. remaining = [p for (s, p) in sorted(scored_packets, key=lambda x: x[0], reverse=True)
  313. if p.metadata.get("type") not in must_keep_types]
  314. # 5) 依据 min_relevance 过滤(仅对扩展上下文:memory、RAG、notes 等)
  315. filtered = [p for p in remaining if p.relevance_score >= self.config.min_relevance]
  316. # 6) 按预算填充
  317. available_tokens = self.config.get_available_tokens()
  318. selected: List[ContextPacket] = []
  319. used_tokens = 0
  320. # 先放入必须保留的上下文(系统指令 + 对话历史)
  321. for p in must_keep_packets:
  322. if used_tokens + p.token_count <= available_tokens:
  323. selected.append(p)
  324. used_tokens += p.token_count
  325. # 再按分数加入其余
  326. for p in filtered:
  327. if used_tokens + p.token_count > available_tokens:
  328. continue
  329. selected.append(p)
  330. used_tokens += p.token_count
  331. return selected
  332. def _structure(
  333. self,
  334. selected_packets: List[ContextPacket],
  335. user_query: str,
  336. system_instructions: Optional[str]
  337. ) -> str:
  338. """Structure: 组织成结构化上下文模板"""
  339. sections = []
  340. # [Role & Policies] - 系统指令
  341. p0_packets = [p for p in selected_packets if p.metadata.get("type") == "instructions"]
  342. if p0_packets:
  343. role_section = "[Role & Policies]\n"
  344. role_section += "\n".join([p.content for p in p0_packets])
  345. sections.append(role_section)
  346. # [Task] - 当前任务
  347. sections.append(f"[Task]\n用户问题:{user_query}")
  348. # [State] - 任务状态
  349. p1_packets = [p for p in selected_packets if p.metadata.get("type") == "task_state"]
  350. if p1_packets:
  351. state_section = "[State]\n关键进展与未决问题:\n"
  352. state_section += "\n".join([p.content for p in p1_packets])
  353. sections.append(state_section)
  354. # [Evidence] - 事实证据
  355. p2_packets = [
  356. p for p in selected_packets
  357. if p.metadata.get("type") in {"related_memory", "knowledge_base", "retrieval", "tool_result"}
  358. ]
  359. if p2_packets:
  360. evidence_section = "[Evidence]\n事实与引用:\n"
  361. for p in p2_packets:
  362. evidence_section += f"\n{p.content}\n"
  363. sections.append(evidence_section)
  364. # [Recent Conversation] - 对话历史(明确标识)
  365. p3_packets = [p for p in selected_packets if p.metadata.get("type") == "history"]
  366. if p3_packets:
  367. context_section = "[Recent Conversation]\n以下是最近的对话记录。当用户询问之前的对话内容时,请参考此部分:\n"
  368. context_section += "\n".join([p.content for p in p3_packets])
  369. sections.append(context_section)
  370. # [Output] - 输出约束(可选)
  371. if self.config.include_output_format:
  372. output_section = """[Output]
  373. 请按以下格式回答:
  374. 1. 结论(简洁明确)
  375. 2. 依据(列出支撑证据及来源)
  376. 3. 风险与假设(如有)
  377. 4. 下一步行动建议(如适用)"""
  378. sections.append(output_section)
  379. return "\n\n".join(sections)
  380. def _compress(self, context: str) -> str:
  381. """Compress: 压缩与规范化"""
  382. if not self.config.enable_compression:
  383. return context
  384. current_tokens = count_tokens(context)
  385. available_tokens = self.config.get_available_tokens()
  386. if current_tokens <= available_tokens:
  387. return context
  388. # LLM 压缩(更保真):仅在提供 llm 时启用
  389. if self.llm is not None:
  390. try:
  391. target = available_tokens
  392. # 尽量保留结构:Role/Task 原样,压缩 Evidence/Context
  393. sys = (
  394. "你是一个上下文压缩器。将输入的多段上下文压缩到目标 token 预算内,"
  395. "尽量保留:用户目标、关键约束、关键证据(文件路径/命令/错误/结论)。"
  396. "不要编造。保持原有分段标题([Role & Policies]/[Task]/[State]/[Evidence]/[Context] 等),"
  397. "但可以大幅精简 [Evidence]/[Context] 内容。输出应尽量短。"
  398. )
  399. user = f"目标预算(约): {target} tokens\n\n原始上下文:\n{context}"
  400. compressed = self.llm.invoke(
  401. [
  402. {"role": "system", "content": sys},
  403. {"role": "user", "content": user},
  404. ],
  405. max_tokens=min(1200, int(self.config.max_tokens * 0.4)),
  406. )
  407. if compressed and isinstance(compressed, str) and count_tokens(compressed) <= target:
  408. return compressed
  409. except Exception:
  410. pass
  411. # 简单截断策略(保留前N个token)
  412. # 实际应用中可用LLM做高保真摘要
  413. print(f"⚠️ 上下文超预算 ({current_tokens} > {available_tokens}),执行截断")
  414. # 按段落截断,保留结构
  415. lines = context.split("\n")
  416. compressed_lines = []
  417. used_tokens = 0
  418. for line in lines:
  419. line_tokens = count_tokens(line)
  420. if used_tokens + line_tokens > available_tokens:
  421. break
  422. compressed_lines.append(line)
  423. used_tokens += line_tokens
  424. return "\n".join(compressed_lines)
  425. def count_tokens(text: str) -> int:
  426. """计算文本token数(使用tiktoken)"""
  427. try:
  428. encoding = tiktoken.get_encoding("cl100k_base")
  429. return len(encoding.encode(text))
  430. except Exception:
  431. # 降级方案:粗略估算(1 token ≈ 4 字符)
  432. return len(text) // 4