1
0

utils.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. """公共工具函数模块"""
  2. import json
  3. import re
  4. from datetime import datetime
  5. from typing import Dict, Any, Optional, Tuple, List
  6. class JSONExtractor:
  7. """
  8. 统一的 JSON 提取器
  9. 从各种格式的 LLM 响应中提取 JSON 数据,支持:
  10. - 纯 JSON 响应
  11. - Markdown 代码块中的 JSON
  12. - Finish[...] 格式(ReAct 标准格式)
  13. - 混杂文本中的 JSON
  14. """
  15. @staticmethod
  16. def extract(
  17. response: str,
  18. required_fields: Optional[List[str]] = None,
  19. fallback_fields: Optional[Dict[str, Any]] = None
  20. ) -> Dict[str, Any]:
  21. """
  22. 从响应中提取 JSON
  23. Args:
  24. response: LLM 响应文本
  25. required_fields: 必需的字段列表,用于验证和优先选择
  26. fallback_fields: 当字段缺失时的默认值
  27. Returns:
  28. 提取的 JSON 字典
  29. Raises:
  30. ValueError: 无法提取有效 JSON 时
  31. """
  32. if not response or not response.strip():
  33. raise ValueError("响应为空")
  34. # 初始化默认值
  35. fallback_fields = fallback_fields or {}
  36. required_fields = required_fields or []
  37. # 尝试多种提取方法
  38. extractors = [
  39. JSONExtractor._extract_from_finish,
  40. JSONExtractor._extract_direct_json,
  41. JSONExtractor._extract_from_markdown_json,
  42. JSONExtractor._extract_from_markdown,
  43. JSONExtractor._extract_from_braces,
  44. ]
  45. last_error = None
  46. for extractor in extractors:
  47. try:
  48. result = extractor(response)
  49. if result is not None:
  50. # 应用默认值
  51. for key, default_value in fallback_fields.items():
  52. if key not in result:
  53. result[key] = default_value
  54. # 如果有必需字段,优先选择包含这些字段的结果
  55. if required_fields:
  56. missing = [f for f in required_fields if f not in result]
  57. if not missing:
  58. return result
  59. else:
  60. return result
  61. except Exception as e:
  62. last_error = e
  63. continue
  64. # 尝试从历史记录中提取(用于 PlanAndSolve 等场景)
  65. try:
  66. result = JSONExtractor._extract_from_history(response)
  67. if result is not None:
  68. for key, default_value in fallback_fields.items():
  69. if key not in result:
  70. result[key] = default_value
  71. return result
  72. except Exception as e:
  73. last_error = e
  74. raise ValueError(f"响应中未找到有效的 JSON 数据: {last_error}")
  75. @staticmethod
  76. def _extract_from_finish(response: str) -> Optional[Dict[str, Any]]:
  77. """从 Finish[...] 格式中提取"""
  78. match = re.search(r"Finish\[(.*)\]", response, re.DOTALL)
  79. if match:
  80. content = match.group(1).strip()
  81. return JSONExtractor._parse_json_with_retry(content)
  82. return None
  83. @staticmethod
  84. def _extract_direct_json(response: str) -> Optional[Dict[str, Any]]:
  85. """直接解析 JSON"""
  86. stripped = response.strip()
  87. if stripped.startswith('{'):
  88. return JSONExtractor._parse_json_with_retry(stripped)
  89. return None
  90. @staticmethod
  91. def _extract_from_markdown_json(response: str) -> Optional[Dict[str, Any]]:
  92. """从 ```json 代码块中提取"""
  93. if "```json" not in response:
  94. return None
  95. json_start = response.find("```json") + 7
  96. json_end = response.find("```", json_start)
  97. if json_end == -1:
  98. return None
  99. json_str = response[json_start:json_end].strip()
  100. return JSONExtractor._parse_json_with_retry(json_str)
  101. @staticmethod
  102. def _extract_from_markdown(response: str) -> Optional[Dict[str, Any]]:
  103. """从普通 ``` 代码块中提取"""
  104. if "```" not in response:
  105. return None
  106. json_start = response.find("```") + 3
  107. json_end = response.find("```", json_start)
  108. if json_end == -1:
  109. return None
  110. json_str = response[json_start:json_end].strip()
  111. # 移除可能的语言标识符
  112. if json_str.startswith("json"):
  113. json_str = json_str[4:].strip()
  114. if json_str.startswith('{'):
  115. return JSONExtractor._parse_json_with_retry(json_str)
  116. return None
  117. @staticmethod
  118. def _extract_from_braces(response: str) -> Optional[Dict[str, Any]]:
  119. """从大括号中提取所有可能的 JSON 对象"""
  120. json_candidates = []
  121. i = 0
  122. while i < len(response):
  123. if response[i] == '{':
  124. brace_count = 0
  125. brace_start = i
  126. brace_end = i
  127. for j in range(i, len(response)):
  128. if response[j] == '{':
  129. brace_count += 1
  130. elif response[j] == '}':
  131. brace_count -= 1
  132. if brace_count == 0:
  133. brace_end = j + 1
  134. break
  135. if brace_end > brace_start:
  136. json_str = response[brace_start:brace_end]
  137. try:
  138. parsed = JSONExtractor._parse_json_with_retry(json_str)
  139. if isinstance(parsed, dict):
  140. json_candidates.append((parsed, len(parsed)))
  141. except:
  142. pass
  143. i = brace_end
  144. else:
  145. i += 1
  146. else:
  147. i += 1
  148. if json_candidates:
  149. # 优先选择包含 'content' 字段的,否则选择字段最多的
  150. for parsed, _ in json_candidates:
  151. if 'content' in parsed and parsed.get('content'):
  152. return parsed
  153. # 返回字段最多的
  154. return max(json_candidates, key=lambda x: x[1])[0]
  155. return None
  156. @staticmethod
  157. def _extract_from_history(response: str) -> Optional[Dict[str, Any]]:
  158. """从历史记录格式中提取(用于 PlanAndSolve 等场景)"""
  159. if "步骤" not in response and "结果" not in response:
  160. return None
  161. # 查找所有包含 JSON 的步骤结果
  162. json_matches = re.findall(r'```json\s*(\{.*?\})\s*```', response, re.DOTALL)
  163. if not json_matches:
  164. json_matches = re.findall(r'(\{"column_title".*?"topics".*?\})', response, re.DOTALL)
  165. for json_str in json_matches:
  166. try:
  167. return json.loads(json_str)
  168. except json.JSONDecodeError:
  169. continue
  170. return None
  171. @staticmethod
  172. def _parse_json_with_retry(json_str: str) -> Dict[str, Any]:
  173. """尝试多种方式解析 JSON"""
  174. # 方法1: 直接解析
  175. try:
  176. return json.loads(json_str)
  177. except json.JSONDecodeError:
  178. pass
  179. # 方法2: 修复未转义的换行符
  180. fixed = json_str.replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
  181. try:
  182. return json.loads(fixed)
  183. except json.JSONDecodeError:
  184. pass
  185. # 方法3: 提取并重新构建 JSON(针对内容字段)
  186. result = JSONExtractor._rebuild_json_from_fields(json_str)
  187. if result:
  188. return result
  189. raise json.JSONDecodeError("无法解析 JSON", json_str, 0)
  190. @staticmethod
  191. def _rebuild_json_from_fields(json_str: str) -> Optional[Dict[str, Any]]:
  192. """从字段中重新构建 JSON"""
  193. title_match = re.search(r'"title"\s*:\s*"([^"]*)"', json_str)
  194. level_match = re.search(r'"level"\s*:\s*(\d+)', json_str)
  195. word_count_match = re.search(r'"word_count"\s*:\s*(\d+)', json_str)
  196. needs_expansion_match = re.search(r'"needs_expansion"\s*:\s*(true|false)', json_str)
  197. # 提取 content(可能跨多行)
  198. content_match = re.search(r'"content"\s*:\s*"(.*?)"(?=\s*[,}])', json_str, re.DOTALL)
  199. if not content_match:
  200. content_match = re.search(r'"content"\s*:\s*"([^"]*(?:\\.[^"]*)*)"', json_str, re.DOTALL)
  201. # 如果没有找到任何字段,返回 None
  202. if not any([title_match, level_match, content_match]):
  203. return None
  204. result = {}
  205. if title_match:
  206. result['title'] = title_match.group(1)
  207. if level_match:
  208. result['level'] = int(level_match.group(1))
  209. if content_match:
  210. content = content_match.group(1)
  211. content = content.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t')
  212. result['content'] = content
  213. if word_count_match:
  214. result['word_count'] = int(word_count_match.group(1))
  215. else:
  216. result['word_count'] = len(result.get('content', ''))
  217. if needs_expansion_match:
  218. result['needs_expansion'] = needs_expansion_match.group(1) == 'true'
  219. else:
  220. result['needs_expansion'] = False
  221. result.setdefault('subsections', [])
  222. result.setdefault('metadata', {})
  223. return result
  224. def parse_react_output(text: str) -> Tuple[Optional[str], Optional[str]]:
  225. """
  226. 解析 ReAct Agent 的输出
  227. 支持多种格式:
  228. - 标准 ReAct 格式: Thought: ... Action: ...
  229. - 中文格式: 思考: ... 行动: ...
  230. - Finish[...] 格式
  231. Args:
  232. text: LLM 的原始响应文本
  233. Returns:
  234. (thought, action) 元组
  235. """
  236. if not text or not text.strip():
  237. print("▸️ 警告: LLM 返回了空响应")
  238. return None, None
  239. # 解析 Thought
  240. thought = None
  241. thought_end_pos = 0
  242. thought_patterns = [
  243. r"Thought:\s*(.*?)(?=\nAction:|\nFinish:|$)", # 标准格式
  244. r"思考:\s*(.*?)(?=\n行动:|\n完成:|$)", # 中文格式
  245. ]
  246. for pattern in thought_patterns:
  247. match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
  248. if match:
  249. thought = match.group(1).strip()
  250. if thought:
  251. thought_end_pos = match.end()
  252. break
  253. # 解析 Action
  254. action = None
  255. action_patterns = [
  256. r"Action:\s*(.*?)(?=\nThought:|\nObservation:|\nFinish:|$)", # 标准格式
  257. r"行动:\s*(.*?)(?=\n思考:|\n观察:|\n完成:|$)", # 中文格式
  258. r"Finish\[(.*?)\]", # Finish 格式
  259. ]
  260. for pattern in action_patterns:
  261. match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
  262. if match:
  263. action = match.group(1).strip()
  264. if action:
  265. if pattern == r"Finish\[(.*?)\]":
  266. action = f"Finish[{action}]"
  267. break
  268. # 尝试其他 Finish 格式
  269. if not action:
  270. finish_patterns = [
  271. r"Finish\s*\[(.*?)\]",
  272. r"完成\s*\[(.*?)\]",
  273. r"最终答案:\s*(.*?)(?=\n|$)",
  274. ]
  275. for pattern in finish_patterns:
  276. match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
  277. if match:
  278. content = match.group(1).strip()
  279. if content:
  280. action = f"Finish[{content}]"
  281. break
  282. # 如果仍未找到 Action,检查是否有完整内容
  283. if not action:
  284. action = _try_extract_complete_content(text, thought, thought_end_pos)
  285. if not action:
  286. print(f"▸️ 警告: 未能解析出 Action")
  287. print(f" 响应内容(前500字符): {text[:500]}")
  288. print(f" 已解析的 Thought: {thought[:100] if thought else 'None'}...")
  289. return thought, action
  290. def _try_extract_complete_content(
  291. text: str,
  292. thought: Optional[str],
  293. thought_end_pos: int
  294. ) -> Optional[str]:
  295. """
  296. 尝试从响应中提取完整内容并包装为 Finish 格式
  297. Args:
  298. text: 原始文本
  299. thought: 已解析的 thought
  300. thought_end_pos: thought 结束位置
  301. Returns:
  302. 包装后的 action 或 None
  303. """
  304. # 查找 JSON 内容
  305. json_match = None
  306. brace_start = text.find('{')
  307. if brace_start != -1:
  308. brace_end = text.rfind('}')
  309. if brace_end > brace_start:
  310. potential_json = text[brace_start:brace_end + 1]
  311. if '"content"' in potential_json or "'content'" in potential_json:
  312. json_match = re.search(r'\{.*?"content".*?\}', potential_json, re.DOTALL)
  313. # 确定要检查的文本
  314. if thought:
  315. remaining_text = text[thought_end_pos:].strip()
  316. if not remaining_text:
  317. remaining_text = thought
  318. else:
  319. remaining_text = text.strip()
  320. # 移除前缀
  321. remaining_text = re.sub(r'^(Action|Finish|行动|完成)[::]\s*', '', remaining_text, flags=re.IGNORECASE)
  322. if not remaining_text and not json_match:
  323. return None
  324. # 使用 JSON 内容
  325. if json_match:
  326. remaining_text = json_match.group(0)
  327. json_str = remaining_text
  328. open_braces = json_str.count('{')
  329. close_braces = json_str.count('}')
  330. json_complete = (open_braces == close_braces) and open_braces > 0
  331. else:
  332. json_complete = False
  333. json_match_check = re.search(r'\{.*?"content".*?\}', remaining_text, re.DOTALL)
  334. if json_match_check:
  335. json_str = json_match_check.group(0)
  336. open_braces = json_str.count('{')
  337. close_braces = json_str.count('}')
  338. json_complete = (open_braces == close_braces) and open_braces > 0
  339. # 检查完成标记
  340. has_ending = bool(re.search(
  341. r'(总结|结论|结语|小结|综上所述|总之|最后|end|conclusion)',
  342. remaining_text[-500:] if len(remaining_text) > 500 else remaining_text,
  343. re.IGNORECASE
  344. ))
  345. has_continuation = bool(re.search(
  346. r'(未完待续|待续|继续|to be continued|未完|待补充)',
  347. remaining_text,
  348. re.IGNORECASE
  349. ))
  350. content_length = len(remaining_text)
  351. is_substantial = content_length > 200
  352. # 判断是否完成
  353. is_complete = False
  354. completion_reason = []
  355. if json_complete:
  356. is_complete = True
  357. completion_reason.append("完整的 JSON 结构")
  358. elif has_ending:
  359. is_complete = True
  360. completion_reason.append("有结尾标记")
  361. elif is_substantial and not has_continuation:
  362. is_complete = True
  363. completion_reason.append("内容足够长且无未完标记")
  364. if is_complete:
  365. print(f"▸ 检测到完整正文内容(长度: {content_length} 字符),自动添加 Finish 前缀")
  366. print(f" - 判断依据: {', '.join(completion_reason)}")
  367. return f"Finish[{remaining_text}]"
  368. else:
  369. print(f"▸️ 检测到部分正文内容(长度: {content_length} 字符),但可能未完成")
  370. if has_continuation:
  371. print(f" - 检测到'未完待续'标记,继续循环让模型完成写作")
  372. elif not is_substantial:
  373. print(f" - 内容长度不足,继续循环让模型完成写作")
  374. return None
  375. def get_current_timestamp() -> str:
  376. """获取当前时间戳(ISO 格式)"""
  377. return datetime.now().isoformat()