| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- """Reflection Agent实现 - 自我反思与迭代优化的智能体"""
- from typing import Optional, List, Dict, Any, Iterator
- from ..core.agent import Agent
- from ..core.llm import HelloAgentsLLM
- from ..core.config import Config
- from ..core.stream import StreamEvent
- # 默认提示词模板
- DEFAULT_PROMPTS = {
- "initial": """
- 请根据以下要求完成任务:
- 任务: {task}
- 请提供一个完整、准确的回答。
- """,
- "reflect": """
- 请仔细审查以下回答,并找出可能的问题或改进空间:
- # 原始任务:
- {task}
- # 当前回答:
- {content}
- 请分析这个回答的质量,指出不足之处,并提出具体的改进建议。
- 如果回答已经很好,请回答"无需改进"。
- """,
- "refine": """
- 请根据反馈意见改进你的回答:
- # 原始任务:
- {task}
- # 上一轮回答:
- {last_attempt}
- # 反馈意见:
- {feedback}
- 请提供一个改进后的回答。
- """,
- }
- class Memory:
- """
- 简单的短期记忆模块,用于存储智能体的行动与反思轨迹。
- """
- def __init__(self):
- self.records: List[Dict[str, Any]] = []
- def add_record(self, record_type: str, content: str):
- """向记忆中添加一条新记录"""
- self.records.append({"type": record_type, "content": content})
- print(f"📝 记忆已更新,新增一条 '{record_type}' 记录。")
- def get_trajectory(self) -> str:
- """将所有记忆记录格式化为一个连贯的字符串文本"""
- trajectory = ""
- for record in self.records:
- if record["type"] == "execution":
- trajectory += f"--- 上一轮尝试 (代码) ---\n{record['content']}\n\n"
- elif record["type"] == "reflection":
- trajectory += f"--- 评审员反馈 ---\n{record['content']}\n\n"
- return trajectory.strip()
- def get_last_execution(self) -> str:
- """获取最近一次的执行结果"""
- for record in reversed(self.records):
- if record["type"] == "execution":
- return record["content"]
- return ""
- class ReflectionAgent(Agent):
- """
- Reflection Agent - 自我反思与迭代优化的智能体
- 这个Agent能够:
- 1. 执行初始任务
- 2. 对结果进行自我反思
- 3. 根据反思结果进行优化
- 4. 迭代改进直到满意
- 特别适合代码生成、文档写作、分析报告等需要迭代优化的任务。
- 支持多种专业领域的提示词模板,用户可以自定义或使用内置模板。
- """
- def __init__(
- self,
- name: str,
- llm: HelloAgentsLLM,
- system_prompt: Optional[str] = None,
- config: Optional[Config] = None,
- max_iterations: int = 3,
- custom_prompts: Optional[Dict[str, str]] = None,
- ):
- """
- 初始化ReflectionAgent
- Args:
- name: Agent名称
- llm: LLM实例
- system_prompt: 系统提示词
- config: 配置对象
- max_iterations: 最大迭代次数
- custom_prompts: 自定义提示词模板 {"initial": "", "reflect": "", "refine": ""}
- """
- super().__init__(name, llm, system_prompt, config)
- self.max_iterations = max_iterations
- self.memory = Memory()
- # 设置提示词模板:用户自定义优先,否则使用默认模板
- self.prompts = custom_prompts if custom_prompts else DEFAULT_PROMPTS
- def run(self, input_text: str, **kwargs) -> str:
- """
- 运行Reflection Agent
- Args:
- input_text: 任务描述
- **kwargs: 支持 conversation_id 参数
- Returns:
- 最终优化后的结果
- """
- conversation_id = kwargs.pop("conversation_id", None)
- print(f"\n🤖 {self.name} 开始处理任务: {input_text}")
- self.memory = Memory()
- print("\n--- 正在进行初始尝试 ---")
- initial_prompt = self.prompts["initial"].format(task=input_text)
- initial_result = self._get_llm_response(initial_prompt, **kwargs)
- self.memory.add_record("execution", initial_result)
- for i in range(self.max_iterations):
- print(f"\n--- 第 {i + 1}/{self.max_iterations} 轮迭代 ---")
- print("\n-> 正在进行反思...")
- last_result = self.memory.get_last_execution()
- reflect_prompt = self.prompts["reflect"].format(
- task=input_text, content=last_result
- )
- feedback = self._get_llm_response(reflect_prompt, **kwargs)
- self.memory.add_record("reflection", feedback)
- if "无需改进" in feedback or "no need for improvement" in feedback.lower():
- print("\n✅ 反思认为结果已无需改进,任务完成。")
- break
- print("\n-> 正在进行优化...")
- refine_prompt = self.prompts["refine"].format(
- task=input_text, last_attempt=last_result, feedback=feedback
- )
- refined_result = self._get_llm_response(refine_prompt, **kwargs)
- self.memory.add_record("execution", refined_result)
- final_result = self.memory.get_last_execution()
- print(f"\n--- 任务完成 ---\n最终结果:\n{final_result}")
- self._save_conversation_messages(input_text, final_result, conversation_id)
- return final_result
- def stream_run(self, input_text: str, **kwargs) -> Iterator[StreamEvent]:
- """
- 流式运行Reflection Agent,输出初始结果、反思过程和优化结果
- Args:
- input_text: 任务描述
- **kwargs: 支持 conversation_id 参数
- Yields:
- StreamEvent: 流式事件
- """
- conversation_id = kwargs.pop("conversation_id", None)
- yield StreamEvent.status(f"开始处理任务: {input_text}")
- self.memory = Memory()
- yield StreamEvent.status("正在进行初始尝试...")
- initial_prompt = self.prompts["initial"].format(task=input_text)
- initial_result = ""
- for chunk in self.llm.stream_invoke(
- [{"role": "user", "content": initial_prompt}], **kwargs
- ):
- if chunk:
- initial_result += chunk
- yield StreamEvent.text(chunk)
- self.memory.add_record("execution", initial_result)
- for i in range(self.max_iterations):
- yield StreamEvent.status(f"第 {i + 1}/{self.max_iterations} 轮迭代")
- yield StreamEvent.status("正在进行反思...")
- last_result = self.memory.get_last_execution()
- reflect_prompt = self.prompts["reflect"].format(
- task=input_text, content=last_result
- )
- feedback = ""
- for chunk in self.llm.stream_invoke(
- [{"role": "user", "content": reflect_prompt}], **kwargs
- ):
- if chunk:
- feedback += chunk
- try:
- yield StreamEvent.thought(feedback)
- except Exception:
- pass
- self.memory.add_record("reflection", feedback)
- if "无需改进" in feedback or "no need for improvement" in feedback.lower():
- yield StreamEvent.status("结果已无需改进,任务完成")
- break
- yield StreamEvent.status("正在进行优化...")
- refine_prompt = self.prompts["refine"].format(
- task=input_text, last_attempt=last_result, feedback=feedback
- )
- refined_result = ""
- for chunk in self.llm.stream_invoke(
- [{"role": "user", "content": refine_prompt}], **kwargs
- ):
- if chunk:
- refined_result += chunk
- yield StreamEvent.text(chunk)
- self.memory.add_record("execution", refined_result)
- final_result = self.memory.get_last_execution()
- self._save_conversation_messages(input_text, final_result, conversation_id)
- yield StreamEvent.done(final_result)
- def _get_llm_response(self, prompt: str, **kwargs) -> str:
- """调用LLM并获取完整响应"""
- messages = [{"role": "user", "content": prompt}]
- return self.llm.invoke(messages, **kwargs) or ""
|