| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453 |
- """使用多 Agent 模式的主系统编排逻辑"""
- from datetime import datetime
- from typing import Dict, Any, List, Optional
- from models import ContentNode, ContentLevel, ColumnPlan, ReviewResult
- from agents import (
- PlannerAgent,
- WriterAgent,
- ReflectionWriterAgent,
- ReviewerAgent,
- RevisionAgent
- )
- from config import get_settings, get_word_count
- class ColumnWriterOrchestrator:
- """
- 提供多 Agent 模式的专栏写作系统
-
- 架构设计:
- 1. PlannerAgent → PlanAndSolveAgent(任务分解和规划)
- 2. WriterAgent → ReActAgent(推理和工具调用)
- 3. 评审+修改 → ReflectionAgent(自我反思优化)
- """
-
- def __init__(self, use_reflection_mode: bool = False):
- """
- 初始化编排器
-
- Args:
- use_reflection_mode: 是否使用 ReflectionAgent 模式
- - True: 使用 ReflectionAgent(自动评审和优化)
- - False: 使用 ReActAgent + 独立评审流程
- """
- self.settings = get_settings()
- self.use_reflection_mode = use_reflection_mode
-
- # 创建各个 Agent
- print("\n 初始化专栏写作系统...")
- print(f" 模式选择: {'ReflectionAgent(自我反思)' if use_reflection_mode else 'ReActAgent(推理行动)+ 评审'}")
-
- # 规划 Agent - 使用 PlanAndSolveAgent
- self.planner = PlannerAgent()
-
- # 写作 Agent - 根据模式选择
- if use_reflection_mode:
- self.writer = ReflectionWriterAgent()
- print(" WriterAgent: ReflectionAgent(内置评审优化)")
- self.reviewer = None
- self.revision_agent = None
- else:
- self.writer = WriterAgent(enable_search=self.settings.enable_search)
- print(" WriterAgent: ReActAgent(推理-行动-搜索)")
-
- # 评审和修改 Agent(仅 ReAct 模式下可用)
- if self.settings.enable_review:
- self.reviewer = ReviewerAgent()
- self.revision_agent = RevisionAgent()
- print(f" ReviewerAgent: 已启用(通过阈值: {self.settings.approval_threshold})")
- print(f" RevisionAgent: 已启用(最大修改次数: {self.settings.max_revisions})")
- else:
- self.reviewer = None
- self.revision_agent = None
- print(" ReviewerAgent: 已禁用")
-
- # 统计信息
- self.stats = {
- 'total_generations': 0,
- 'total_reviews': 0,
- 'total_revisions': 0,
- 'total_rewrites': 0,
- 'approved_first_try': 0,
- 'start_time': None,
- 'end_time': None
- }
-
- print("▸ 系统初始化完成\n")
-
- def create_column(self, main_topic: str) -> Dict[str, Any]:
- """
- 创建完整专栏
-
- Args:
- main_topic: 专栏主题
-
- Returns:
- 包含专栏完整信息的字典
- """
- self.stats['start_time'] = datetime.now()
-
- print(f"\n{'='*70}")
- print(f"▸ 开始创建专栏:{main_topic}")
- print(f"{'='*70}\n")
-
- # Step 1: 规划专栏结构(使用 PlanAndSolveAgent)
- print("▸ 第一步:规划专栏结构(PlanAndSolveAgent)")
- print("-" * 70)
- column_plan = self.planner.plan_column(main_topic)
- print(f" 标题:{column_plan.column_title}")
- print(f" 话题数:{column_plan.get_topic_count()} 个")
- print(f" 目标读者:{column_plan.target_audience}\n")
-
- # Step 2: 为每个子话题创建内容树
- mode_name = "ReflectionAgent" if self.use_reflection_mode else "ReActAgent"
- print(f"▸️ 第二步:撰写专栏文章({mode_name})")
- print("-" * 70)
-
- content_trees = self._write_topics_sequential(column_plan)
-
- # Step 3: 组装完整专栏
- print("\n▸ 第三步:组装专栏内容")
- print("-" * 70)
- full_column = self._assemble_column(column_plan, content_trees)
-
- self.stats['end_time'] = datetime.now()
- duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds()
-
- print(f"\n{'='*70}")
- print(f"▸ 专栏创建完成!耗时 {duration:.1f} 秒")
- print(f"{'='*70}\n")
-
- # 添加统计信息
- full_column['creation_stats'] = self.stats
- full_column['agent_modes'] = {
- 'planner': 'PlanAndSolveAgent',
- 'writer': 'ReflectionAgent' if self.use_reflection_mode else 'ReActAgent',
- 'reviewer': 'ReviewerAgent' if (self.reviewer and not self.use_reflection_mode) else None,
- 'revision': 'RevisionAgent' if (self.revision_agent and not self.use_reflection_mode) else None
- }
-
- return full_column
-
- def _write_topics_sequential(self, column_plan: ColumnPlan) -> List[ContentNode]:
- """顺序写作各个话题"""
- content_trees = []
-
- for idx, topic in enumerate(column_plan.topics, 1):
- print(f"\n{'─'*70}")
- print(f"▸ 正在写作第 {idx}/{column_plan.get_topic_count()} 个话题")
- print(f" 话题:{topic['title']}")
- print(f"{'─'*70}")
-
- tree = self._write_topic_tree(topic, column_plan)
- content_trees.append(tree)
-
- # 显示进度
- progress = idx / column_plan.get_topic_count() * 100
- print(f"\n▸ 总体进度:{progress:.0f}% ({idx}/{column_plan.get_topic_count()})")
-
- return content_trees
-
- def _write_topic_tree(
- self,
- topic: Dict[str, Any],
- column_context: ColumnPlan
- ) -> ContentNode:
- """递归写作话题树"""
- root = ContentNode(
- id=topic['id'],
- title=topic['title'],
- level=ContentLevel.TOPIC,
- description=topic['description']
- )
-
- context = {
- 'column_title': column_context.column_title,
- 'column_description': column_context.column_description,
- 'target_audience': column_context.target_audience,
- 'current_topic': topic
- }
-
- self._recursive_write(root, context, level=1)
- return root
-
- def _recursive_write(
- self,
- node: ContentNode,
- context: Dict[str, Any],
- level: int
- ):
- """递归写作核心逻辑"""
- if level > self.settings.max_depth:
- indent = " " * level
- print(f"{indent}▸️ 达到最大深度 {self.settings.max_depth},停止展开")
- return
-
- indent = " " * level
- print(f"\n{indent}{'┈'*40}")
- print(f"{indent}▸ Level {level}: {node.title}")
- print(f"{indent}{'┈'*40}")
-
- if self.use_reflection_mode:
- # 模式1: 使用 ReflectionAgent(内置评审优化)
- self._write_with_reflection(node, context, level, indent)
- else:
- # 模式2: 使用 ReActAgent(推理-行动)
- self._write_with_react(node, context, level, indent)
-
- def _write_with_reflection(
- self,
- node: ContentNode,
- context: Dict[str, Any],
- level: int,
- indent: str
- ):
- """使用 ReflectionAgent 模式写作"""
- print(f"{indent}▸️ 使用 ReflectionAgent 生成并优化内容...")
-
- content_data = self.writer.generate_and_refine_content(node, context, level)
- self.stats['total_generations'] += 1
-
- # ReflectionAgent 已经完成了自我评审和优化
- node.content = content_data['content']
- node.metadata = content_data.get('metadata', {})
- node.metadata['agent_mode'] = 'ReflectionAgent'
- node.metadata['auto_refined'] = True
-
- word_count = content_data.get('word_count', len(content_data['content']))
- print(f"{indent} 字数:{word_count}")
- print(f"{indent}▸ 内容已通过自我反思优化")
-
- # 处理子节点
- self._process_children(node, content_data, context, level, indent)
-
- def _write_with_react(
- self,
- node: ContentNode,
- context: Dict[str, Any],
- level: int,
- indent: str
- ):
- """使用 ReActAgent 模式写作(可选评审)"""
- print(f"{indent}▸️ 使用 ReActAgent 生成内容(推理-行动)...")
-
- content_data = self.writer.generate_content(node, context, level)
- self.stats['total_generations'] += 1
-
- current_content = content_data['content']
- word_count = content_data.get('word_count', len(current_content))
- print(f"{indent} 字数:{word_count}")
- print(f"{indent}▸ ReActAgent 完成推理和行动")
-
- # 如果启用评审,进行评审和可能的修改
- if self.reviewer and self.settings.enable_review:
- current_content, review_metadata = self._review_and_revise(
- node, current_content, content_data, level, indent
- )
- content_data['content'] = current_content
- content_data['metadata'] = {**content_data.get('metadata', {}), **review_metadata}
-
- node.content = current_content
- node.metadata = content_data.get('metadata', {})
- node.metadata['agent_mode'] = 'ReActAgent'
-
- # 处理子节点
- self._process_children(node, content_data, context, level, indent)
-
- def _review_and_revise(
- self,
- node: ContentNode,
- content: str,
- content_data: Dict[str, Any],
- level: int,
- indent: str
- ) -> tuple:
- """
- 评审并根据需要修改内容
-
- Args:
- node: 当前节点
- content: 当前内容
- content_data: 完整的内容数据
- level: 层级
- indent: 缩进
-
- Returns:
- (最终内容, 评审元数据)
- """
- target_word_count = get_word_count(level)
- key_points = content_data.get('metadata', {}).get('keywords', [])
- if not key_points:
- key_points = [node.title, node.description]
-
- revision_count = 0
- final_content = content
- review_history = []
-
- while revision_count <= self.settings.max_revisions:
- # 评审
- print(f"{indent}▸ 开始评审(第 {revision_count + 1} 轮)...")
- review_result = self.reviewer.review_content(
- content=final_content,
- level=level,
- target_word_count=target_word_count,
- key_points=key_points
- )
- self.stats['total_reviews'] += 1
-
- review_history.append({
- 'round': revision_count + 1,
- 'score': review_result.score,
- 'grade': review_result.grade,
- 'needs_revision': review_result.needs_revision
- })
-
- print(f"{indent} 评审结果: {review_result.score}/100 ({review_result.grade})")
-
- # 检查是否通过评审
- if review_result.score >= self.settings.approval_threshold:
- print(f"{indent}▸ 内容通过评审!")
- if revision_count == 0:
- self.stats['approved_first_try'] += 1
- break
-
- # 检查是否还能修改
- if revision_count >= self.settings.max_revisions:
- print(f"{indent}▸️ 达到最大修改次数 ({self.settings.max_revisions}),使用当前版本")
- break
-
- # 检查是否需要重写(分数太低)
- if review_result.score < self.settings.revision_threshold:
- print(f"{indent}▸️ 分数过低 ({review_result.score} < {self.settings.revision_threshold}),需要重写")
- self.stats['total_rewrites'] += 1
- # 重新生成内容
- new_content_data = self.writer.generate_content(
- node,
- {'review_feedback': review_result.reviewer_notes},
- level,
- additional_requirements=f"请注意避免以下问题: {review_result.reviewer_notes}"
- )
- self.stats['total_generations'] += 1
- final_content = new_content_data['content']
- else:
- # 修改内容
- print(f"{indent}▸ 根据评审意见修改内容...")
- revised_data = self.revision_agent.revise_content(
- original_content=final_content,
- review_result=review_result,
- target_word_count=target_word_count
- )
- self.stats['total_revisions'] += 1
- final_content = revised_data.get('revised_content', final_content)
-
- revision_count += 1
-
- # 构建评审元数据
- final_review = review_history[-1] if review_history else {}
- review_metadata = {
- 'review_score': final_review.get('score'),
- 'review_grade': final_review.get('grade'),
- 'review_rounds': len(review_history),
- 'review_history': review_history,
- 'reviewed': True
- }
-
- return final_content, review_metadata
-
- def _process_children(
- self,
- node: ContentNode,
- content_data: Dict[str, Any],
- context: Dict[str, Any],
- level: int,
- indent: str
- ):
- """处理子节点"""
- if content_data.get('needs_expansion') and level < self.settings.max_depth:
- subsections = content_data.get('subsections', [])
- if subsections:
- print(f"{indent}▸ 需要展开 {len(subsections)} 个子节点")
-
- for subsection in subsections:
- child = ContentNode(
- id=subsection['id'],
- title=subsection['title'],
- level=ContentLevel(level + 1),
- description=subsection['description']
- )
- node.add_child(child)
-
- # 递归写作子节点
- self._recursive_write(child, context, level + 1)
-
- def _assemble_column(
- self,
- plan: ColumnPlan,
- trees: List[ContentNode]
- ) -> Dict[str, Any]:
- """组装完整专栏"""
- articles = []
-
- for tree in trees:
- article_content = self._tree_to_markdown(tree)
-
- articles.append({
- 'id': tree.id,
- 'title': tree.title,
- 'content': article_content,
- 'metadata': tree.metadata,
- 'word_count': tree.count_words()
- })
-
- return {
- 'column_info': {
- 'title': plan.column_title,
- 'description': plan.column_description,
- 'target_audience': plan.target_audience,
- 'topic_count': plan.get_topic_count()
- },
- 'articles': articles,
- 'statistics': self._calculate_statistics(trees)
- }
-
- def _tree_to_markdown(self, node: ContentNode, depth: int = 0) -> str:
- """将内容树转换为markdown"""
- markdown = []
-
- heading_level = "#" * (depth + 1)
- markdown.append(f"{heading_level} {node.title}\n")
-
- if node.content:
- markdown.append(node.content)
- markdown.append("\n")
-
- for child in node.children:
- child_md = self._tree_to_markdown(child, depth + 1)
- markdown.append(child_md)
-
- return "\n".join(markdown)
-
- def _calculate_statistics(self, trees: List[ContentNode]) -> Dict[str, Any]:
- """计算统计信息"""
- total_words = 0
- total_nodes = 0
-
- def count_tree(node: ContentNode):
- nonlocal total_words, total_nodes
- total_nodes += 1
- total_words += len(node.content) if node.content else 0
-
- for child in node.children:
- count_tree(child)
-
- for tree in trees:
- count_tree(tree)
-
- return {
- 'total_articles': len(trees),
- 'total_nodes': total_nodes,
- 'total_words': total_words,
- 'avg_words_per_article': total_words // len(trees) if trees else 0
- }
|