| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181 |
- """核心 Agent"""
- import json
- import os
- import hashlib
- from pathlib import Path
- from typing import Dict, Any, Optional, List
- from hello_agents import (
- HelloAgentsLLM,
- ReActAgent,
- ReflectionAgent,
- PlanAndSolveAgent
- )
- from hello_agents.tools import MCPTool, ToolRegistry, SearchTool
- from models import ColumnPlan, ReviewResult, ContentNode, ContentLevel
- from prompts import get_structure_requirements, get_react_writer_prompt, get_reflection_writer_prompts, get_planner_prompts
- from config import get_settings, get_word_count
- from utils import JSONExtractor, parse_react_output, get_current_timestamp
- settings = get_settings()
- class LLMService:
- """LLM 服务单例"""
- _instance: Optional[HelloAgentsLLM] = None
-
- @classmethod
- def get_llm(cls) -> HelloAgentsLLM:
- """获取 LLM 实例(单例模式)"""
- if cls._instance is None:
- cls._instance = HelloAgentsLLM()
- print(f"▸ LLM服务初始化成功")
- print(f" 提供商: {cls._instance.provider}")
- print(f" 模型: {cls._instance.model}")
- return cls._instance
- class PlannerAgent:
- """
- 使用 PlanAndSolveAgent 模式
-
- PlanAndSolveAgent 将任务分解为子任务并逐步执行,非常适合专栏规划场景:
- 1. 分析主题(理解用户需求)
- 2. 规划子话题(分解任务)
- 3. 组织结构(逐步执行)
-
- 支持缓存机制,以主题为key缓存规划结果
- """
-
- def __init__(self, cache_dir: str = ".cache"):
- """
- 初始化规划 Agent
-
- Args:
- cache_dir: 缓存目录路径
- """
- self.llm = LLMService.get_llm()
- self.cache_dir = Path(cache_dir)
- self.cache_dir.mkdir(exist_ok=True)
-
- # 自定义 PlanAndSolve 提示词
- planner_prompts = {
- "planner": """
- 你是一位经验丰富的专栏策划专家。请将以下专栏主题分解为清晰的子话题规划步骤。
- 主题: {question}
- 请按以下格式输出规划步骤:
- ```python
- [
- "步骤1: 分析主题的核心概念和目标读者",
- "步骤2: 确定知识体系的整体框架",
- "步骤3: 规划2-4个子话题,确保逻辑递进",
- "步骤4: 为每个子话题设定学习目标和要点",
- "步骤5: 组装完整的专栏大纲"
- ]
- ```
- 不能超过10个步骤。
- """,
- "executor": """
- 你是专栏规划执行专家。请按照规划步骤执行专栏大纲的生成。
- # 原始主题: {question}
- # 规划步骤: {plan}
- # 已完成步骤: {history}
- # 当前步骤: {current_step}
- ▸️ **关键要求**:
- - 不能超过10个步骤。
- - 如果当前步骤是"步骤5: 组装完整的专栏大纲"或包含"组装"、"完整"、"大纲"等关键词,**必须**输出完整的 JSON 格式专栏大纲
- - 如果不是最后一步,请输出当前步骤的分析结果(文本格式)
- **最后一步的输出格式(必须是 JSON,不要添加任何其他文本)**:
- ```json
- {{
- "column_title": "专栏总标题",
- "column_description": "专栏简介(100-200字)",
- "target_audience": "目标读者群体",
- "topics": [
- {{
- "id": "topic_001",
- "title": "子话题标题",
- "description": "子话题简介(50-100字)",
- "estimated_words": 200,
- "key_points": ["要点1", "要点2", "要点3"],
- "prerequisites": ["前置知识1", "前置知识2"]
- }}
- ]
- }}
- ```
- **重要**:如果是最后一步,请直接输出 JSON,不要添加"当前步骤分析结果"等前缀文本。
- 请执行当前步骤:
- """
- }
-
- # 创建带缓存的 Executor 包装器
- from hello_agents.agents.plan_solve_agent import Executor
-
- class CachedExecutor(Executor):
- """带缓存的 Executor,缓存每个步骤的执行结果"""
- def __init__(self, llm_client, prompt_template, cache_dir, main_topic):
- super().__init__(llm_client, prompt_template)
- self.cache_dir = cache_dir
- self.main_topic = main_topic
- self.steps_cache_dir = cache_dir / "steps_cache"
- self.steps_cache_dir.mkdir(exist_ok=True)
-
- def _get_step_cache_key(self, step_index: int, step_content: str) -> Path:
- """生成步骤缓存文件路径"""
- # 使用主题 + 步骤索引 + 步骤内容的hash作为key
- step_hash = hashlib.md5(
- f"{self.main_topic}_{step_index}_{step_content}".encode('utf-8')
- ).hexdigest()
- return self.steps_cache_dir / f"step_{step_index}_{step_hash}.json"
-
- def _load_step_from_cache(self, step_index: int, step_content: str) -> Optional[str]:
- """从缓存加载步骤结果"""
- cache_file = self._get_step_cache_key(step_index, step_content)
- if not cache_file.exists():
- return None
-
- try:
- with open(cache_file, 'r', encoding='utf-8') as f:
- cache_data = json.load(f)
- # 验证缓存的主题和步骤是否匹配
- if (cache_data.get('topic') == self.main_topic and
- cache_data.get('step_index') == step_index and
- cache_data.get('step_content') == step_content):
- print(f" ▸ 从缓存加载步骤 {step_index} 的结果")
- return cache_data.get('result')
- except Exception as e:
- print(f" ▸️ 加载步骤缓存失败: {e}")
- return None
-
- def _save_step_to_cache(self, step_index: int, step_content: str, result: str):
- """保存步骤结果到缓存"""
- cache_file = self._get_step_cache_key(step_index, step_content)
- try:
- cache_data = {
- 'topic': self.main_topic,
- 'step_index': step_index,
- 'step_content': step_content,
- 'result': result
- }
- with open(cache_file, 'w', encoding='utf-8') as f:
- json.dump(cache_data, f, ensure_ascii=False, indent=2)
- except Exception as e:
- print(f" ▸️ 保存步骤缓存失败: {e}")
-
- def execute(self, question: str, plan: List[str], **kwargs) -> str:
- """按计划执行任务(带缓存)"""
- history = ""
- final_answer = ""
-
- print("\n--- 正在执行计划 ---")
- for i, step in enumerate(plan, 1):
- print(f"\n-> 正在执行步骤 {i}/{len(plan)}: {step}")
-
- # 尝试从缓存加载
- cached_result = self._load_step_from_cache(i, step)
- if cached_result:
- response_text = cached_result
- else:
- # 缓存未命中,执行步骤
- prompt = self.prompt_template.format(
- question=question,
- plan=plan,
- history=history if history else "无",
- current_step=step
- )
- messages = [{"role": "user", "content": prompt}]
- response_text = self.llm_client.invoke(messages, **kwargs) or ""
-
- # 保存到缓存
- self._save_step_to_cache(i, step, response_text)
-
- history += f"步骤 {i}: {step}\n结果: {response_text}\n\n"
- final_answer = response_text
- print(f"▸ 步骤 {i} 已完成,结果: {final_answer[:100] if len(final_answer) > 100 else final_answer}...")
-
- return final_answer
-
- # 创建 PlanAndSolveAgent,但替换 Executor
- self.agent = PlanAndSolveAgent(
- name="专栏规划专家",
- llm=self.llm,
- custom_prompts=planner_prompts
- )
-
- # 替换 Executor 为带缓存的版本
- cached_executor = CachedExecutor(
- llm_client=self.llm,
- prompt_template=planner_prompts["executor"],
- cache_dir=self.cache_dir,
- main_topic="" # 将在 plan_column 中设置
- )
- self.agent.executor = cached_executor
-
- def _get_cache_key(self, main_topic: str) -> str:
- """
- 生成缓存key(使用主题的hash值)
-
- Args:
- main_topic: 专栏主题
-
- Returns:
- 缓存文件名
- """
- # 使用主题的hash值作为文件名
- topic_hash = hashlib.md5(main_topic.encode('utf-8')).hexdigest()
- return f"plan_{topic_hash}.json"
-
- def _load_from_cache(self, main_topic: str) -> Optional[ColumnPlan]:
- """
- 从缓存加载规划结果
-
- Args:
- main_topic: 专栏主题
-
- Returns:
- ColumnPlan 实例,如果缓存不存在则返回 None
- """
- cache_file = self.cache_dir / self._get_cache_key(main_topic)
-
- if not cache_file.exists():
- return None
-
- try:
- with open(cache_file, 'r', encoding='utf-8') as f:
- cache_data = json.load(f)
-
- # 验证缓存的主题是否匹配
- if cache_data.get('topic') != main_topic:
- print(f"▸️ 缓存主题不匹配,忽略缓存")
- return None
-
- plan_data = cache_data.get('plan')
- if not plan_data:
- return None
-
- plan = ColumnPlan.from_dict(plan_data)
- print(f"▸ 从缓存加载规划结果")
- print(f" 缓存文件: {cache_file}")
- return plan
- except Exception as e:
- print(f"▸️ 加载缓存失败: {e}")
- return None
-
- def _save_to_cache(self, main_topic: str, plan: ColumnPlan):
- """
- 保存规划结果到缓存
-
- Args:
- main_topic: 专栏主题
- plan: ColumnPlan 实例
- """
- cache_file = self.cache_dir / self._get_cache_key(main_topic)
-
- try:
- cache_data = {
- 'topic': main_topic,
- 'plan': plan.to_dict(),
- 'cached_at': get_current_timestamp() # 正确的缓存时间戳
- }
-
- with open(cache_file, 'w', encoding='utf-8') as f:
- json.dump(cache_data, f, ensure_ascii=False, indent=2)
-
- print(f"▸ 规划结果已保存到缓存: {cache_file}")
- except Exception as e:
- print(f"▸️ 保存缓存失败: {e}")
-
- def plan_column(self, main_topic: str, use_cache: bool = True) -> ColumnPlan:
- """
- 规划专栏大纲
-
- Args:
- main_topic: 专栏主题
- use_cache: 是否使用缓存(默认True)
-
- Returns:
- ColumnPlan 实例
- """
- # 尝试从缓存加载
- if use_cache:
- cached_plan = self._load_from_cache(main_topic)
- if cached_plan:
- print(f" 专栏标题: {cached_plan.column_title}")
- print(f" 话题数量: {cached_plan.get_topic_count()}")
- return cached_plan
-
- # 缓存未命中,调用 LLM 进行规划
- print(f"\n▸ PlanAndSolve Agent 开始规划专栏...")
- print(f" 使用模式: 任务分解 → 逐步执行")
- print(f" 主题: {main_topic}")
-
- # 更新 Executor 的主题(用于缓存key)
- if hasattr(self.agent.executor, 'main_topic'):
- self.agent.executor.main_topic = main_topic
-
- response = self.agent.run(main_topic)
-
- # 解析 JSON 响应
- plan_data = self._extract_json(response)
- plan = ColumnPlan.from_dict(plan_data)
-
- print(f"▸ 规划完成")
- print(f" 专栏标题: {plan.column_title}")
- print(f" 话题数量: {plan.get_topic_count()}")
-
- # 保存到缓存
- if use_cache:
- self._save_to_cache(main_topic, plan)
-
- return plan
-
- def _extract_json(self, response: str) -> Dict[str, Any]:
- """从响应中提取 JSON(使用统一的 JSONExtractor)"""
- try:
- return JSONExtractor.extract(
- response,
- required_fields=['column_title', 'topics']
- )
- except Exception as e:
- print(f"▸️ JSON 提取失败: {e}")
- print(f" 响应内容(前500字符): {response[:500]}...")
- raise
- class ReActAgentWrapper:
- """
- ReActAgent 包装器,用于捕获历史信息和处理错误
- """
- def __init__(self, agent: ReActAgent):
- self.agent = agent
- self.last_history = [] # 保存最后一次运行的历史
- self.last_response = None # run() 方法的返回值(通常是 final_answer)
- self.last_raw_responses = [] # 保存所有原始 LLM 响应,用于调试
-
- def run(self, question: str):
- """
- 运行 Agent 并捕获历史信息
-
- Args:
- question: 问题
- """
- try:
- # 清空上次的原始响应
- self.last_raw_responses = []
-
- # 尝试访问 agent 的 history 属性(如果存在)
- if hasattr(self.agent, 'current_history'):
- original_history = self.agent.current_history.copy() if self.agent.current_history else []
- elif hasattr(self.agent, 'history'):
- original_history = self.agent.history.copy() if self.agent.history else []
- else:
- original_history = []
-
- # 如果 agent 有 _parse_output 方法,保存原始方法并替换为改进版本
- original_parse = None
- original_invoke = None
-
- if hasattr(self.agent, '_parse_output'):
- original_parse = self.agent._parse_output
- # 使用统一的解析函数(包装为方法)
- def parse_wrapper(text):
- return parse_react_output(text)
- self.agent._parse_output = parse_wrapper
-
- # 拦截 LLM 调用以捕获原始响应
- if hasattr(self.agent, 'llm') and hasattr(self.agent.llm, 'invoke'):
- original_invoke = self.agent.llm.invoke
-
- def wrapped_invoke(messages, **kwargs):
- """包装 LLM invoke 方法以捕获原始响应"""
- response = original_invoke(messages, **kwargs)
- if response:
- self.last_raw_responses.append(response)
- return response
-
- self.agent.llm.invoke = wrapped_invoke
-
- try:
- response = self.agent.run(question)
- self.last_response = response
-
- # 尝试获取最终的历史信息
- if hasattr(self.agent, 'current_history'):
- self.last_history = self.agent.current_history.copy() if self.agent.current_history else []
- elif hasattr(self.agent, 'history'):
- self.last_history = self.agent.history.copy() if self.agent.history else []
- else:
- self.last_history = original_history
-
- return response
- finally:
- # 恢复原始方法
- if original_parse:
- self.agent._parse_output = original_parse
- if original_invoke and hasattr(self.agent, 'llm'):
- self.agent.llm.invoke = original_invoke
-
- except Exception as e:
- # 即使出错也尝试保存历史
- if hasattr(self.agent, 'current_history'):
- self.last_history = self.agent.current_history.copy() if self.agent.current_history else []
- elif hasattr(self.agent, 'history'):
- self.last_history = self.agent.history.copy() if self.agent.history else []
- print(f"▸️ ReActAgentWrapper 捕获到异常: {e}")
- raise
- class WriterAgent:
- """
- 写作 Agent - 使用 ReActAgent 模式
-
- ReActAgent 结合推理(Reasoning)和行动(Acting),非常适合需要工具调用的写作场景:
- 1. 分析写作需求(推理)
- 2. 决定是否需要搜索(推理)
- 3. 调用搜索工具(行动)
- 4. 整合信息写作(行动)
- """
-
- def __init__(self, enable_search: bool = True):
- """
- 初始化写作 Agent
-
- Args:
- enable_search: 是否启用搜索功能
- """
- self.llm = LLMService.get_llm()
- self.enable_search = enable_search
-
- # 创建工具注册表
- self.tool_registry = ToolRegistry()
-
- # 添加搜索工具(如果启用)
- if enable_search:
- self._setup_search_tool()
-
- # 自定义 ReAct 提示词(参考示例代码的简洁格式)
- react_prompt = get_react_writer_prompt() # 从 prompts.py 获取
- # 创建 ReActAgent(将在包装器中替换解析方法)
- react_agent = ReActAgent(
- name="内容创作专家",
- llm=self.llm,
- tool_registry=self.tool_registry,
- custom_prompt=react_prompt,
- max_steps=10 # 增加到 10 步,给 Agent 更多机会完成任务
- )
-
- self.agent = ReActAgentWrapper(react_agent)
-
- def _setup_search_tool(self):
- """设置搜索工具(使用 SearchTool 和 MCPTool)"""
- settings = get_settings()
-
- # 保存 search_tool 实例供 wrappers 使用
- self.search_tool = None
-
- # 1. 初始化内置 SearchTool
- try:
- # 检查是否配置了搜索 API
- if settings.tavily_api_key or settings.serpapi_api_key:
- self.search_tool = SearchTool(
- tavily_key=settings.tavily_api_key,
- serpapi_key=settings.serpapi_api_key
- )
- print("▸ SearchTool (内置) 已初始化")
- else:
- print("▸️ 未配置搜索 API Key (Tavily/SerpApi),跳过 SearchTool 初始化")
- except Exception as e:
- print(f"▸️ 初始化 SearchTool 失败: {e}")
- # 2. 注册 wrapper 函数 (如果 search_tool 可用)
- if self.search_tool:
- self._register_search_wrappers()
-
- # 3. 注册 GitHub MCPTool
- try:
- # 检查是否有 GitHub Token (通常在环境变量 GITHUB_PERSONAL_ACCESS_TOKEN)
- if os.environ.get("GITHUB_PERSONAL_ACCESS_TOKEN"):
- github_tool = MCPTool(
- name="github",
- description="GitHub 操作工具,支持搜索仓库、查看代码等",
- server_command=["npx", "-y", "@modelcontextprotocol/server-github"],
- auto_expand=True
- )
- self.tool_registry.register_tool(github_tool)
- print("▸ GitHub MCPTool 已注册")
- else:
- print("▸️ 未配置 GITHUB_PERSONAL_ACCESS_TOKEN,跳过 GitHub MCPTool 注册")
- except Exception as e:
- print(f"▸️ 注册 GitHub MCPTool 失败: {e}")
- def _register_search_wrappers(self):
- """注册适配 Prompt 的搜索函数 wrappers"""
-
- def web_search(query: str) -> str:
- """通用网页搜索,获取最新资讯和资料"""
- # SearchTool.run 接受 dict 参数
- return str(self.search_tool.run({"query": query}))
-
- def search_recent_info(topic: str) -> str:
- """搜索最新信息和动态"""
- return str(self.search_tool.run({"query": f"{topic} latest info"}))
-
- def search_code_examples(technology: str, task: str) -> str:
- """搜索代码示例和教程"""
- return str(self.search_tool.run({"query": f"{technology} {task} code examples tutorial"}))
-
- def verify_facts(statement: str) -> str:
- """验证事实准确性"""
- return str(self.search_tool.run({"query": f"verify fact: {statement}"}))
-
- self.tool_registry.register_function("web_search", "通用网页搜索,获取最新资讯和资料", web_search)
- self.tool_registry.register_function("search_recent_info", "搜索最新信息和动态", search_recent_info)
- self.tool_registry.register_function("search_code_examples", "搜索代码示例和教程", search_code_examples)
- self.tool_registry.register_function("verify_facts", "验证事实准确性", verify_facts)
- print("▸ 搜索函数 wrappers 已注册")
-
-
- def generate_content(
- self,
- node: ContentNode,
- context: Dict[str, Any],
- level: int,
- additional_requirements: str = ""
- ) -> Dict[str, Any]:
- """
- 生成内容(使用 ReAct 模式)
-
- Args:
- node: 当前节点
- context: 写作上下文
- level: 当前层级
- additional_requirements: 额外要求
-
- Returns:
- 生成的内容数据
- """
- structure_requirements = get_structure_requirements(level)
- word_count = get_word_count(level)
-
- # 构建写作任务描述(简化格式,参考示例代码)
- task_description = f"""
- 请撰写一篇技术专栏文章。
- 层级: Level {level}/3
- 话题: {node.title}
- 描述: {node.description}
- 要求字数: {word_count} 字(允许误差±10%)
- 上下文信息:
- {json.dumps(context, ensure_ascii=False, indent=2)}
- 结构要求:
- {structure_requirements}
- 额外要求:
- {additional_requirements if additional_requirements else "无"}
- 重要提示:
- - 完成写作后,必须使用 `\n\nFinish[JSON内容]` 格式输出结果
- - JSON 中的 `level` 字段必须是 {level}
- - `content` 字段必须包含完整的文章正文(Markdown格式)
- - 文章必须包含:引言、主体内容(3-5个小节)、实践案例、总结
- """
-
- try:
- response = self.agent.run(task_description)
-
- # 调试:打印真正的原始 LLM 响应(最后一次的响应)
- print(f"\n{'='*70}")
- print("▸ ReActAgent 原始 LLM 响应:")
- print(f"{'='*70}")
- if self.agent.last_raw_responses:
- # 打印最后一次的原始响应(通常是包含 Finish[...] 的那次)
- last_raw = self.agent.last_raw_responses[-1]
- print(last_raw)
- # print(last_raw[:2000] if len(last_raw) > 2000 else last_raw)
- # if len(last_raw) > 2000:
- # print(f"\n... (响应过长,已截断,总长度: {len(last_raw)} 字符)")
- else:
- print("▸️ 未捕获到原始响应")
- print(f"{'='*70}\n")
-
- # 打印 run() 方法的返回值(通常是 final_answer)
- print(f"▸ ReActAgent.run() 返回值:")
- print(f" {response[:500] if response and len(response) > 500 else response}")
- print()
-
- # 检查响应是否有效
- # 注意:即使 response 为空或错误,也要检查是否有原始响应可以提取
- if not response or (isinstance(response, str) and not response.strip()):
- print("▸️ ReActAgent 返回了空响应或空白响应")
- print(f" 已收集的历史信息: {len(self.agent.last_history)} 条")
-
- # 尝试从最后一次原始响应中提取内容
- if self.agent.last_raw_responses:
- last_raw = self.agent.last_raw_responses[-1]
- print(f" 尝试从最后一次原始响应中提取内容(长度: {len(last_raw)} 字符)...")
- # 尝试直接提取 JSON
- try:
- content_data = self._extract_json(last_raw)
- # 验证提取的 JSON 是否包含必需的字段
- if not isinstance(content_data, dict):
- raise ValueError("提取的内容不是字典格式")
- if 'content' not in content_data:
- print(f" ▸️ 提取的 JSON 缺少 'content' 字段")
- print(f" 可用字段: {list(content_data.keys())}")
- raise ValueError("提取的 JSON 缺少 'content' 字段")
- print("▸ 成功从原始响应中提取到内容")
- return content_data
- except Exception as e:
- print(f" ▸️ 从原始响应提取失败: {e}")
-
- # 如果提取失败,使用 fallback
- return self._generate_content_with_history(
- node, context, level, structure_requirements, word_count,
- self.agent.last_history, task_description
- )
-
- # 检查是否是错误消息
- if "无法在限定步数内完成" in response or "抱歉" in response or "流程终止" in response:
- print("▸️ ReActAgent 达到最大步数限制或无法完成任务")
- print(f" 已收集的历史信息: {len(self.agent.last_history)} 条")
-
- # 即使返回错误消息,也尝试从最后一次原始响应中提取内容
- if self.agent.last_raw_responses:
- last_raw = self.agent.last_raw_responses[-1]
- print(f" 尝试从最后一次原始响应中提取内容(长度: {len(last_raw)} 字符)...")
- try:
- content_data = self._extract_json(last_raw)
- # 验证提取的 JSON 是否包含必需的字段
- if not isinstance(content_data, dict):
- raise ValueError("提取的内容不是字典格式")
- if 'content' not in content_data:
- print(f" ▸️ 提取的 JSON 缺少 'content' 字段")
- print(f" 可用字段: {list(content_data.keys())}")
- raise ValueError("提取的 JSON 缺少 'content' 字段")
- print("▸ 成功从原始响应中提取到内容(尽管 ReActAgent 返回了错误消息)")
- return content_data
- except Exception as e:
- print(f" ▸️ 从原始响应提取失败: {e}")
-
- # 如果提取失败,基于历史信息生成内容
- return self._generate_content_with_history(
- node, context, level, structure_requirements, word_count,
- self.agent.last_history, task_description
- )
-
- # 如果 response 是 "JSON内容" 这样的占位符,从原始响应中提取
- if response.strip() in ["JSON内容", "JSON", "内容"]:
- print(f"▸️ ReActAgent 返回了占位符 '{response}',尝试从原始响应中提取...")
- if self.agent.last_raw_responses:
- last_raw = self.agent.last_raw_responses[-1]
- print(f" 从最后一次原始响应中提取(长度: {len(last_raw)} 字符)...")
- try:
- content_data = self._extract_json(last_raw)
- if isinstance(content_data, dict) and 'content' in content_data:
- print("▸ 成功从原始响应中提取到内容")
- return content_data
- except Exception as e:
- print(f" ▸️ 从原始响应提取失败: {e}")
-
- content_data = self._extract_json(response)
-
- # 验证提取的 JSON 是否包含必需的字段
- if not isinstance(content_data, dict):
- raise ValueError(f"提取的内容不是字典格式: {type(content_data)}")
- if 'content' not in content_data:
- print(f"▸️ 提取的 JSON 缺少 'content' 字段")
- print(f" 可用字段: {list(content_data.keys())}")
- print(f" 响应内容(前500字符): {response[:500]}")
-
- # 如果从 response 提取失败,尝试从原始响应中提取
- if self.agent.last_raw_responses:
- last_raw = self.agent.last_raw_responses[-1]
- print(f" 尝试从最后一次原始响应中提取(长度: {len(last_raw)} 字符)...")
- try:
- content_data = self._extract_json(last_raw)
- if isinstance(content_data, dict) and 'content' in content_data:
- print("▸ 成功从原始响应中提取到内容")
- return content_data
- except Exception as e:
- print(f" ▸️ 从原始响应提取失败: {e}")
-
- raise ValueError("提取的 JSON 缺少 'content' 字段")
-
- return content_data
- except Exception as e:
- print(f"▸️ ReActAgent 执行失败: {e}")
- import traceback
- traceback.print_exc()
- print(f" 已收集的历史信息: {len(self.agent.last_history)} 条")
- print(" 尝试基于历史信息生成内容...")
- return self._generate_content_with_history(
- node, context, level, structure_requirements, word_count,
- self.agent.last_history, task_description
- )
-
- def _generate_content_with_history(
- self,
- node: ContentNode,
- context: Dict[str, Any],
- level: int,
- structure_requirements: str,
- word_count: int,
- history: List[str],
- original_task: str
- ) -> Dict[str, Any]:
- """
- 当 ReActAgent 失败时,基于历史信息使用 SimpleAgent 生成内容
-
- Args:
- history: ReActAgent 收集的历史信息(Thought、Action、Observation)
- """
- from hello_agents import SimpleAgent
-
- fallback_agent = SimpleAgent(
- name="内容创作专家(备用)",
- llm=self.llm,
- system_prompt="你是一位专业的内容创作者,擅长撰写技术专栏文章。"
- )
-
- # 构建包含历史信息的任务描述
- history_summary = ""
- if history:
- history_summary = "\n\n## 已撰写的部分历史:\n"
- for i, item in enumerate(history[-10:], 1): # 只取最后10条历史
- history_summary += f"{i}. {item}\n"
- history_summary += "\n请基于以上信息继续完成写作任务。\n"
-
- task = f"""
- 请撰写一篇技术专栏文章。
- 话题: {node.title}
- 描述: {node.description}
- 要求字数: {word_count} 字
- 结构要求:
- {structure_requirements}
- {history_summary}
- 请直接输出 JSON 格式的内容:
- {{
- "title": "{node.title}",
- "level": {level},
- "content": "完整的文章正文(markdown格式,包含引言、主体、案例、总结)",
- "word_count": 实际字数,
- "needs_expansion": false,
- "subsections": [],
- "metadata": {{}}
- }}
- """
-
- print(f"▸ 使用 SimpleAgent 基于历史信息生成内容...")
- response = fallback_agent.run(task)
- return self._extract_json(response)
-
- def revise_content(
- self,
- original_content: str,
- review_result: ReviewResult,
- level: int
- ) -> Dict[str, Any]:
- """
- 根据评审意见修改内容
-
- Args:
- original_content: 原始内容
- review_result: 评审结果
- level: 层级
-
- Returns:
- 修改后的内容数据
- """
- # 构建修改任务
- task_description = f"""
- ## 修改任务
- **原始内容**:
- {original_content[:500]}...
- **评审分数**: {review_result.score}/100
- **评审等级**: {review_result.grade}
- **主要问题**:
- {json.dumps(review_result.detailed_feedback.get('issues', [])[:3], ensure_ascii=False, indent=2)}
- **修改建议**:
- {json.dumps(review_result.revision_plan.get('priority_changes', []), ensure_ascii=False, indent=2)}
- 请使用 ReAct 模式完成修改:
- 1. 思考评审意见的核心要求
- 2. 决定是否需要搜索新信息
- 3. 修改内容
- 4. 使用 Finish[修改后的JSON内容] 输出结果
- """
-
- response = self.agent.run(task_description)
- revised_data = self._extract_json(response)
-
- return revised_data
-
- def _extract_json(self, response: str) -> Dict[str, Any]:
- """从响应中提取 JSON(使用统一的 JSONExtractor)"""
- try:
- return JSONExtractor.extract(
- response,
- required_fields=['content'],
- fallback_fields={
- 'subsections': [],
- 'metadata': {},
- 'needs_expansion': False
- }
- )
- except Exception as e:
- print(f"▸️ 提取 JSON 时发生错误: {e}")
- print(f" 响应内容(前1000字符): {response[:1000]}")
- raise
- class ReviewerAgent:
- """
- 评审 Agent - 使用 SimpleAgent 模式
-
- 负责对生成的内容进行质量评审,提供详细的评分和修改建议
- """
-
- def __init__(self):
- from hello_agents import SimpleAgent
- from prompts import get_reviewer_prompt
-
- self.llm = LLMService.get_llm()
- self.reviewer_prompt = get_reviewer_prompt()
-
- self.agent = SimpleAgent(
- name="内容评审专家",
- llm=self.llm,
- system_prompt="你是一位严格而专业的内容评审专家,擅长评估文章质量并提供建设性的修改意见。"
- )
-
- def review_content(
- self,
- content: str,
- level: int,
- target_word_count: int,
- key_points: List[str]
- ) -> 'ReviewResult':
- """
- 评审内容
-
- Args:
- content: 待评审的内容
- level: 内容层级
- target_word_count: 目标字数
- key_points: 关键要点
-
- Returns:
- ReviewResult 实例
- """
- print(f"\n▸ ReviewerAgent 开始评审内容...")
- print(f" 内容长度: {len(content)} 字符")
- print(f" 目标字数: {target_word_count}")
-
- # 构建评审任务
- task = self.reviewer_prompt.format(
- level=level,
- target_word_count=target_word_count,
- key_points=json.dumps(key_points, ensure_ascii=False),
- content=content
- )
-
- response = self.agent.run(task)
- review_data = self._extract_json(response)
-
- # 创建 ReviewResult 实例
- result = ReviewResult.from_dict(review_data)
-
- print(f"▸ 评审完成")
- print(f" 评分: {result.score}/100 ({result.grade})")
- print(f" 需要修改: {'是' if result.needs_revision else '否'}")
-
- return result
-
- def _extract_json(self, response: str) -> Dict[str, Any]:
- """从响应中提取 JSON"""
- try:
- return JSONExtractor.extract(
- response,
- required_fields=['score', 'grade'],
- fallback_fields={
- 'dimension_scores': {},
- 'detailed_feedback': {'strengths': [], 'issues': []},
- 'revision_plan': {'priority_changes': [], 'minor_improvements': []},
- 'needs_revision': True,
- 'estimated_revision_effort': '',
- 'reviewer_notes': ''
- }
- )
- except Exception as e:
- print(f"▸️ 评审结果解析失败: {e}")
- # 返回默认的评审结果(需要修改)
- return {
- 'score': 60,
- 'grade': '需改进',
- 'dimension_scores': {},
- 'detailed_feedback': {'strengths': [], 'issues': [{'problem': '评审结果解析失败'}]},
- 'revision_plan': {'priority_changes': [], 'minor_improvements': []},
- 'needs_revision': True,
- 'estimated_revision_effort': '未知',
- 'reviewer_notes': f'评审结果解析失败: {str(e)}'
- }
- class RevisionAgent:
- """
- 修改 Agent - 使用 SimpleAgent 模式
-
- 根据评审意见修改内容
- """
-
- def __init__(self):
- from hello_agents import SimpleAgent
- from prompts import get_revision_prompt
-
- self.llm = LLMService.get_llm()
- self.revision_prompt = get_revision_prompt()
-
- self.agent = SimpleAgent(
- name="内容修改专家",
- llm=self.llm,
- system_prompt="你是一位专业的内容创作者,擅长根据评审意见修改和优化文章。"
- )
-
- def revise_content(
- self,
- original_content: str,
- review_result: 'ReviewResult',
- target_word_count: int
- ) -> Dict[str, Any]:
- """
- 根据评审意见修改内容
-
- Args:
- original_content: 原始内容
- review_result: 评审结果
- target_word_count: 目标字数
-
- Returns:
- 修改后的内容数据
- """
- print(f"\n▸ RevisionAgent 开始修改内容...")
- print(f" 原始评分: {review_result.score}/100")
-
- current_word_count = len(original_content)
- word_count_min = int(target_word_count * 0.9)
- word_count_max = int(target_word_count * 1.1)
-
- # 计算字数调整建议
- if current_word_count < word_count_min:
- word_count_adjustment = f"需要增加约 {word_count_min - current_word_count} 字"
- elif current_word_count > word_count_max:
- word_count_adjustment = f"需要删减约 {current_word_count - word_count_max} 字"
- else:
- word_count_adjustment = "字数在合理范围内"
-
- # 格式化评审信息
- strengths = "\n".join([f"- {s}" for s in review_result.detailed_feedback.get('strengths', [])])
- issues = "\n".join([
- f"- [{issue.get('category', '未知')}] {issue.get('problem', '')}: {issue.get('suggestion', '')}"
- for issue in review_result.detailed_feedback.get('issues', [])
- ])
- priority_changes = "\n".join([
- f"- **{change.get('section', '')}**: {change.get('action', '')} - {change.get('detail', '')}"
- for change in review_result.revision_plan.get('priority_changes', [])
- ])
- minor_improvements = "\n".join([
- f"- {imp.get('section', '')}: {imp.get('detail', '')}"
- for imp in review_result.revision_plan.get('minor_improvements', [])
- ])
-
- # 构建修改任务
- task = self.revision_prompt.format(
- original_content=original_content,
- score=review_result.score,
- grade=review_result.grade,
- strengths=strengths or "无",
- issues=issues or "无",
- reviewer_notes=review_result.reviewer_notes or "无",
- priority_changes=priority_changes or "无",
- minor_improvements=minor_improvements or "无",
- word_count_range=f"{word_count_min}-{word_count_max}",
- current_word_count=current_word_count,
- word_count_adjustment=word_count_adjustment
- )
-
- response = self.agent.run(task)
- revised_data = self._extract_json(response)
-
- print(f"▸ 修改完成")
- print(f" 修改后字数: {revised_data.get('word_count', len(revised_data.get('revised_content', '')))}")
-
- return revised_data
-
- def _extract_json(self, response: str) -> Dict[str, Any]:
- """从响应中提取 JSON"""
- try:
- data = JSONExtractor.extract(
- response,
- required_fields=['revised_content'],
- fallback_fields={
- 'revision_summary': {'major_changes': [], 'minor_changes': [], 'preserved_strengths': []},
- 'word_count': 0,
- 'word_count_change': ''
- }
- )
- # 如果没有 word_count,计算一下
- if not data.get('word_count'):
- data['word_count'] = len(data.get('revised_content', ''))
- return data
- except Exception as e:
- print(f"▸️ 修改结果解析失败: {e}")
- raise
- class ReflectionWriterAgent:
- """
- 反思写作 Agent - 使用 ReflectionAgent 模式
-
- ReflectionAgent 通过自我反思和迭代优化来改进输出,将评审和修改整合为一个 Agent:
- 1. 生成初稿
- 2. 自我评审(反思)
- 3. 根据反思修改(优化)
- 4. 达到质量标准
- """
-
- def __init__(self):
- self.llm = LLMService.get_llm()
-
- # 自定义 Reflection 提示词
- reflection_prompts = {
- "initial": """
- 你是一位专业的内容创作者。请撰写以下内容的初稿:
- {task}
- 请输出完整的 JSON 格式内容。
- """,
- "reflect": """
- 你是一位严格的内容评审专家。请评审以下内容:
- # 写作任务: {task}
- # 内容初稿: {content}
- 请从以下维度评审:
- 1. **内容质量** (40分): 准确性、完整性、深度、原创性
- 2. **结构逻辑** (30分): 层次清晰、逻辑连贯、过渡自然
- 3. **语言表达** (20分): 易读性、专业性、准确性
- 4. **格式规范** (10分): 字数达标、格式正确、排版美观
- 如果内容质量很好(85分以上),请回答"无需改进"。
- 否则,请详细指出问题并提供具体的修改建议。
- """,
- "refine": """
- 请根据评审意见优化你的内容:
- # 原始任务: {task}
- # 当前内容: {last_attempt}
- # 评审意见: {feedback}
- 请输出优化后的完整 JSON 格式内容。
- """
- }
-
- self.agent = ReflectionAgent(
- name="反思写作专家",
- llm=self.llm,
- custom_prompts=reflection_prompts,
- max_iterations=2 # 最多反思 2 次
- )
-
- def generate_and_refine_content(
- self,
- node: ContentNode,
- context: Dict[str, Any],
- level: int
- ) -> Dict[str, Any]:
- """
- 生成并反思优化内容
-
- Args:
- node: 当前节点
- context: 写作上下文
- level: 当前层级
-
- Returns:
- 优化后的内容数据
- """
- print(f"\n▸ ReflectionAgent 开始写作并自我反思...")
- print(f" 使用模式: 初稿 → 自我评审 → 优化")
-
- structure_requirements = get_structure_requirements(level)
- word_count = get_word_count(level)
-
- task_description = f"""
- ## 写作任务
- **层级**: Level {level}/3
- **话题**: {node.title}
- **描述**: {node.description}
- **要求字数**: {word_count} 字(允许误差±10%)
- **结构要求**:
- {structure_requirements}
- **上下文**:
- {json.dumps(context, ensure_ascii=False, indent=2)}
- 请输出完整的 JSON 格式内容:
- ```json
- {{
- "title": "章节标题",
- "level": {level},
- "content": "正文内容(markdown格式)",
- "word_count": 实际字数,
- "needs_expansion": true/false,
- "subsections": [...],
- "metadata": {{...}}
- }}
- ```
- """
-
- response = self.agent.run(task_description)
- content_data = self._extract_json(response)
-
- print(f"▸ ReflectionAgent 完成反思优化")
-
- return content_data
-
- def _extract_json(self, response: str) -> Dict[str, Any]:
- """从响应中提取 JSON(使用统一的 JSONExtractor)"""
- try:
- return JSONExtractor.extract(
- response,
- required_fields=['content'],
- fallback_fields={
- 'subsections': [],
- 'metadata': {},
- 'needs_expansion': False
- }
- )
- except Exception as e:
- print(f"▸️ JSON 解析失败: {e}")
- raise
|