chapter_generate_agent.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. from dotenv import load_dotenv
  2. load_dotenv()
  3. import re
  4. import os
  5. import json
  6. from pydantic import BaseModel
  7. from typing import List, Dict, Any
  8. from datetime import datetime
  9. from hello_agents import SimpleAgent, HelloAgentsLLM
  10. from hello_agents.tools import NoteTool
  11. from prompt import CHAPTER_PROMPT, CHAPTER_REVIEW_PROMPT, CHAPTER_START_PROMPT
  12. def extract_note_id(output: str) -> str:
  13. """从 NoteTool 的输出文本中提取 note_id"""
  14. match = re.search(r"ID:\s*(note_[0-9_]+)", output)
  15. if not match:
  16. raise ValueError(f"无法从输出解析 note_id:\n{output}")
  17. return match.group(1)
  18. class MemoryItem(BaseModel):
  19. """记忆项数据结构"""
  20. node_id: str
  21. novel_id: str
  22. title: str
  23. content: str
  24. summary: str
  25. timestamp: datetime
  26. metadata: Dict[str, Any] = {}
  27. next_chapter_prediction: str = ""
  28. class ChapterGenerateAgent:
  29. """具有上下文感知能力的 Agent"""
  30. def __init__(self, name: str, llm: HelloAgentsLLM = HelloAgentsLLM(), max_steps: int = 5, chapter_length: int = 3000, **kwargs):
  31. self.chapter_length = chapter_length
  32. self.max_steps = max_steps
  33. self.num_chapter_memories = kwargs.get("num_chapter_memories", 5)
  34. self.workspace = kwargs.get("workspace", "./outputs")
  35. self.note_tools: Dict[str, NoteTool] = {}
  36. self.generate_agent = SimpleAgent(name="章节生成助手", llm=llm, system_prompt='你是一位擅长长篇小说结构与文本细化的专业作者助理。')
  37. self.review_agent = SimpleAgent(name="章节审核助手", llm=llm, system_prompt='你是一位专业的小说审核助手,负责检查章节是否符合小说的结构和风格。')
  38. # 内存存储
  39. self.memories: Dict[str, List[MemoryItem]] = {}
  40. @staticmethod
  41. def extract_json_from_response(response: str) -> dict:
  42. """从模型输出中提取并解析 JSON"""
  43. # 尝试清理 Markdown 代码块标记
  44. clean_response = re.sub(r"```json\s*", "", response)
  45. clean_response = re.sub(r"```\s*$", "", clean_response)
  46. clean_response = clean_response.strip()
  47. try:
  48. return json.loads(clean_response)
  49. except json.JSONDecodeError as e:
  50. # 如果直接解析失败,尝试在文本中寻找第一个 { 和最后一个 }
  51. try:
  52. start = clean_response.find("{")
  53. end = clean_response.rfind("}")
  54. if start != -1 and end != -1:
  55. json_str = clean_response[start : end + 1]
  56. return json.loads(json_str)
  57. except Exception:
  58. pass
  59. raise ValueError(f"无法解析 JSON 响应: {response}") from e
  60. def _ensure_tool(self, novel_id: str, novel_title: str = None):
  61. if not self.note_tools.get(novel_id):
  62. if not novel_title:
  63. raise ValueError(f"Tool for novel_id {novel_id} not initialized and novel_title not provided.")
  64. self.note_tools[novel_id] = NoteTool(workspace=os.path.join(self.workspace, f"{novel_title}-{novel_id}", 'chapters'))
  65. def get_content_from_note(self, content: str) -> str:
  66. try:
  67. # 去除 YAML 前置元数据
  68. frontmatter_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL)
  69. if frontmatter_match:
  70. content = content[frontmatter_match.end():].strip()
  71. # 去除标题(第一行如果是标题)
  72. lines = content.split('\n')
  73. if lines and lines[0].startswith('# '):
  74. content = '\n'.join(lines[1:]).strip()
  75. return content
  76. except:
  77. return content
  78. def get_memories(self, novel_id: str):
  79. """获取最近章节记忆"""
  80. if not hasattr(self.note_tools[novel_id], "notes_index"):
  81. self.note_tools[novel_id]._load_index()
  82. notes = self.note_tools[novel_id].notes_index.get("notes", [])
  83. # 筛选相关章节笔记
  84. chapter_notes = [
  85. n for n in notes
  86. if n.get("note_type") == "chapter" and str(novel_id) in n.get("title", "")
  87. ]
  88. # 获取最后 N 章
  89. recent_notes = chapter_notes[-self.num_chapter_memories:]
  90. for note in recent_notes:
  91. note_id = note.get("id")
  92. file_path = os.path.join(self.workspace, f"{note_id}.md")
  93. if os.path.exists(file_path):
  94. with open(file_path, "r", encoding="utf-8") as f:
  95. content = f.read()
  96. content = self.get_content_from_note(content)
  97. self.memories[novel_id].append(MemoryItem(
  98. node_id=str(note_id),
  99. title=note.get("title", "未知章节").strip(),
  100. content=content,
  101. novel_id=str(novel_id),
  102. summary=note['tags'][0]if note.get("tags") and note['tags'] else '',
  103. timestamp=datetime.fromisoformat(note.get("created_at", datetime.now().isoformat()))
  104. ))
  105. def run(self, user_input: str, **kwargs) -> str:
  106. """运行 Agent"""
  107. # 小说id用来区分小说,命名可能会重复
  108. novel_id = kwargs.pop("novel_id", None)
  109. assert novel_id, "请提供小说ID"
  110. novel_title = kwargs.pop("novel_title", None)
  111. assert novel_title, "请提供小说标题"
  112. self._ensure_tool(novel_id, novel_title)
  113. if not self.memories.get(novel_id):
  114. self.memories[novel_id] = []
  115. self.get_memories(novel_id)
  116. # 1. 构建上下文
  117. outline = self.get_outline(novel_id)
  118. prev_chapter = self.get_prev_chapter(novel_id)
  119. prev_summaries = self.get_prev_summaries(novel_id)
  120. chapter_length = kwargs.get("chapter_length", self.chapter_length)
  121. context = self.get_prompt(outline, prev_chapter, prev_summaries, user_input, novel_id, chapter_length=chapter_length)
  122. # 2. 使用上下文调用 LLM
  123. steps = 0
  124. while steps < self.max_steps:
  125. steps += 1
  126. # 生成章节内容
  127. response = self.generate_agent.run(context)
  128. try:
  129. response_data = self.extract_json_from_response(response)
  130. # 检查是否包含必要字段
  131. if 'title' not in response_data or 'content' not in response_data or 'next_chapter_prediction' not in response_data or 'summary' not in response_data:
  132. raise ValueError("JSON 响应缺少必要字段 'title' 或 'content' 或 'next_chapter_prediction' 或 'summary'")
  133. except ValueError as e:
  134. print(f"步骤 {steps} 生成的 JSON 解析错误:{e}")
  135. continue
  136. # 审核章节内容
  137. review_context = CHAPTER_REVIEW_PROMPT.format(
  138. outline=outline,
  139. prev_chapter=prev_chapter,
  140. prev_summaries=prev_summaries,
  141. chapter_content=response_data.get('content', '')
  142. )
  143. review_response = self.review_agent.run(review_context)
  144. # 检查审核结果
  145. if "【通过】" in review_response:
  146. break
  147. context = self.get_prompt(outline, prev_chapter, prev_summaries, user_input, novel_id, response_data, review_response, chapter_length=chapter_length)
  148. # 3. 保存章节到笔记
  149. create_output = self.note_tools[novel_id].run({
  150. "action": "create",
  151. "title": f"{response_data.get('title', '未知章节')}",
  152. "content": response_data.get('content', ''),
  153. "note_type": "chapter",
  154. "tags": [response_data.get('summary', '')]
  155. })
  156. # 获取章节笔记ID,保存记忆,并建立与小说ID的关联
  157. note_id = extract_note_id(create_output)
  158. self.memories[novel_id].append(MemoryItem(
  159. node_id=note_id,
  160. title=response_data.get('title', '未知章节'),
  161. content=response_data.get('content', ''),
  162. novel_id=novel_id,
  163. summary=response_data.get('summary', ''),
  164. timestamp=datetime.now().isoformat(),
  165. next_chapter_prediction=response_data.get('next_chapter_prediction', '')
  166. ))
  167. return response_data, note_id
  168. def get_prompt(self, outline: str, prev_chapter: str, prev_summaries: str, user_input: str, novel_id: str, response_data: dict = None, review_response: str = None, chapter_length: int = None) -> str:
  169. """获取章节生成提示"""
  170. if chapter_length is None:
  171. chapter_length = self.chapter_length
  172. is_first_chapter = (prev_chapter == '无' and prev_summaries == '无')
  173. if is_first_chapter:
  174. prompt_template = CHAPTER_START_PROMPT
  175. context = prompt_template.format(
  176. outline=outline,
  177. chapter_history='无' if response_data is None else response_data.get('content', '无'),
  178. evaluation=review_response or "无",
  179. user_input=user_input,
  180. chapter_length=chapter_length
  181. )
  182. else:
  183. prompt_template = CHAPTER_PROMPT
  184. context = prompt_template.format(
  185. outline=outline,
  186. prev_chapter=prev_chapter,
  187. prev_summaries=prev_summaries,
  188. chapter_history='无' if response_data is None else response_data.get('content', '无'),
  189. evaluation=review_response or "无",
  190. user_input=user_input or [self.memories[novel_id][-1].next_chapter_prediction if self.memories[novel_id] else "无"][0],
  191. chapter_length=chapter_length
  192. )
  193. return context
  194. def get_outline(self, novel_id: str) -> str:
  195. """获取大纲"""
  196. dir_path = f"{os.path.dirname(self.note_tools[novel_id].workspace)}/outline"
  197. paths = os.listdir(dir_path)
  198. assert len(paths) >= 1, f"目录 {dir_path} 下应该有大纲文件"
  199. # 简单取第一个文件,实际可能需要更精确的逻辑
  200. path = f"{dir_path}/{paths[0]}"
  201. with open(path, "r", encoding='utf-8') as f:
  202. outline = f.read()
  203. return self.get_content_from_note(outline)
  204. def get_prev_chapter(self, novel_id: str):
  205. """获取前一章内容"""
  206. if self.memories.get(novel_id):
  207. last_mem = self.memories[novel_id][-1]
  208. return f"【{last_mem.metadata.get('title', '未知')}】\n...{last_mem.content[-800:]}"
  209. return "无"
  210. def get_prev_summaries(self, novel_id: str):
  211. if self.memories.get(novel_id):
  212. return "\n".join([f"【{mem.title}】\n{mem.summary}" for mem in self.memories[novel_id][-self.num_chapter_memories:]])
  213. return "无"
  214. def del_chapter(self, novel_id:str, note_id: str, novel_title: str = None):
  215. """删除章节"""
  216. if novel_title:
  217. self._ensure_tool(novel_id, novel_title)
  218. self.note_tools[novel_id].run({
  219. "action": "delete",
  220. "note_id": note_id
  221. })
  222. # 从记忆中删除该章节
  223. if self.memories.get(novel_id):
  224. self.memories[novel_id] = [mem for mem in self.memories[novel_id] if mem.node_id != note_id]
  225. def update_chapter(self, novel_id:str, note_id: str, novel_title: str = None, **kwargs):
  226. """更新章节"""
  227. if novel_title:
  228. self._ensure_tool(novel_id, novel_title)
  229. self.note_tools[novel_id].run({
  230. "action": "update",
  231. "note_id": note_id,
  232. **kwargs
  233. })
  234. # 更新记忆中的章节内容
  235. if self.memories.get(novel_id):
  236. for mem in self.memories[novel_id]:
  237. if mem.node_id == note_id:
  238. mem.title = kwargs.get('title', mem.title)
  239. mem.content = kwargs.get('content', mem.content)
  240. mem.summary = kwargs.get('summary', mem.summary)
  241. mem.next_chapter_prediction = kwargs.get('next_chapter_prediction', mem.next_chapter_prediction)
  242. mem.timestamp = datetime.now().isoformat()
  243. break
  244. def main():
  245. print("=" * 80)
  246. print("Novel ChapterGenerateAgent 示例")
  247. print("=" * 80 + "\n")
  248. # llm = HelloAgentsLLM(model="qwen3:0.6b", api_key="ollama", base_url="http://127.0.0.1:11434/v1", provider='ollama')
  249. llm = HelloAgentsLLM(provider='qwen')
  250. novel_id = "demo_novel_001"
  251. novel_title = "记忆之城"
  252. # 1. 模拟大纲文件存在
  253. # 因为 ChapterGenerateAgent.get_outline 依赖于文件系统查找大纲
  254. # 我们手动创建一个假的大纲文件用于测试
  255. workspace_root = "./outputs"
  256. # 注意:这里模拟 OutlineAgent 的输出路径结构
  257. outline_dir = os.path.join(workspace_root, f"{novel_title}-{novel_id}", "outline")
  258. if not os.path.exists(outline_dir):
  259. os.makedirs(outline_dir)
  260. # 清理旧文件以确保测试环境干净
  261. for f in os.listdir(outline_dir):
  262. try:
  263. os.remove(os.path.join(outline_dir, f))
  264. except Exception:
  265. pass
  266. dummy_outline_content = """---
  267. tags: [outline]
  268. created_at: 2025-01-27T10:00:00
  269. ---
  270. # 记忆之城-大纲
  271. ## 核心梗概
  272. 一位能与城市记忆对话的年轻人,在拆迁浪潮中发现一段被刻意抹去的历史。
  273. ## 主要人物
  274. - 李寻:主角,拥有"读取"物体记忆的能力。
  275. - 陈叔:古董店老板,似乎知道李寻身世的秘密。
  276. ## 故事走向
  277. 1. 觉醒能力,卷入拆迁冲突。
  278. 2. 发现神秘物品,引出旧事。
  279. 3. ...
  280. """
  281. dummy_outline_path = os.path.join(outline_dir, f"{novel_id}-outline.md")
  282. with open(dummy_outline_path, "w", encoding="utf-8") as f:
  283. f.write(dummy_outline_content)
  284. print(f"已创建模拟大纲文件: {dummy_outline_path}")
  285. # 2. 初始化章节生成 Agent
  286. chapter_agent = ChapterGenerateAgent(
  287. name="小说章节助手",
  288. llm=llm,
  289. workspace=workspace_root, # 使用与 OutlineAgent 一致的根目录
  290. chapter_length=1000 # 演示用,设短一点
  291. )
  292. # 3. 生成第一章
  293. print(f"\n正在生成第一章...")
  294. try:
  295. # run 方法需要 novel_title 来定位目录
  296. chapter_data_1, note_id_1 = chapter_agent.run(
  297. user_input="第一章需要通过一个具体的拆迁冲突场景,引出主角的能力。主角李寻在试图保护一家老店不被强拆时,无意中听到了推土机的'心声'。",
  298. novel_id=novel_id,
  299. novel_title=novel_title
  300. )
  301. print(f"第一章生成完成,Note ID: {note_id_1}")
  302. print(f"标题: {chapter_data_1.get('title')}")
  303. print(f"摘要: {chapter_data_1.get('summary')}")
  304. print(f"下一章预测: {chapter_data_1.get('next_chapter_prediction')}")
  305. # 4. 生成第二章(会自动读取第一章作为上下文)
  306. print(f"\n正在生成第二章...")
  307. chapter_data_2, note_id_2 = chapter_agent.run(
  308. user_input="主角在废墟中发现了一个奇怪的物品,触发了回忆。那个物品似乎在呼唤他。",
  309. novel_id=novel_id,
  310. novel_title=novel_title
  311. )
  312. print(f"第二章生成完成,Note ID: {note_id_2}")
  313. print(f"标题: {chapter_data_2.get('title')}")
  314. print(f"摘要: {chapter_data_2.get('summary')}")
  315. except Exception as e:
  316. print(f"生成过程中出错: {e}")
  317. import traceback
  318. traceback.print_exc()
  319. if __name__ == "__main__":
  320. main()