1
0

exercise.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. from hello_agents import SimpleAgent, HelloAgentsLLM
  2. from programmer.services.problem_repository import ProblemRepository
  3. from hello_agents.tools import RAGTool
  4. import re
  5. class ExerciseAgent(SimpleAgent):
  6. """
  7. 从本地题库中筛选编程题目的智能体(RAG + LLM 决策)
  8. """
  9. def __init__(self, llm: HelloAgentsLLM):
  10. system_prompt = """
  11. 你是一位【编程题目筛选助手】。
  12. 你的职责是:
  13. - 理解用户对【难度】【知识点】【学习目标】的要求
  14. - 输出你理解的用户需要的题目难度,只有两种选择Easy或Medium
  15. ⚠️ 重要规则:
  16. - 不要生成新题目
  17. 你只需要输出Easy或Medium
  18. """
  19. super().__init__(
  20. name="Exercise",
  21. llm=llm,
  22. system_prompt=system_prompt
  23. )
  24. root_dir = r"E:\PycharmProject_lmx\HelloAgents-main\output"
  25. self.repo = ProblemRepository(root_dir)
  26. # ===== 初始化 RAG =====
  27. self.rag = RAGTool(
  28. collection_name="rag_knowledge_base",
  29. rag_namespace="problems"
  30. )
  31. # ===== 判断是否需要初始化题库 =====
  32. need_init = False
  33. try:
  34. # 尝试随便搜一个词,判断库是否为空
  35. test = self.rag.search(query="Easy", limit=1)
  36. if not test:
  37. need_init = True
  38. except Exception:
  39. # 向量库不存在 / 第一次运行
  40. need_init = True
  41. if need_init:
  42. # 第一次运行先添加题目到rag中
  43. for problem in self.repo.problems:
  44. self.rag.add_text(
  45. text=f"""
  46. Title: {problem['title']}
  47. Difficulty: {problem['difficulty']}
  48. Tags: {", ".join(problem['tags'])}
  49. Description: {problem['description'][:200]}
  50. """.strip(),
  51. document_id=problem["title"]
  52. )
  53. print("✅ 编程题目向量仓库构建完成")
  54. def run(self, input_text: str, max_tool_iterations: int = 3, **kwargs) -> str:
  55. result = super().run(input_text)
  56. # ========= RAG 语义召回 =========
  57. rag_results = self.rag.search(
  58. query=result,
  59. limit=3,
  60. min_score=0.3
  61. )
  62. titles = re.findall(r"Title:\s*(.+)", rag_results)
  63. user_problems = []
  64. # ========= 2️⃣ 本地题库精确过滤 =========
  65. for title in titles:
  66. problem = self.get_problem_by_title(title)
  67. if problem:
  68. user_problems.append(problem)
  69. if not user_problems:
  70. return "❌ 没有找到相关题目"
  71. # ========= 4️⃣ 返回标准化结果 =========
  72. return "\n\n".join(
  73. self._format_problem(problem)
  74. for problem in user_problems
  75. )
  76. # =========================================================
  77. # RAG 解析
  78. # =========================================================
  79. def get_problem_by_title(self, title: str):
  80. for problem in self.repo.problems:
  81. if problem.get("title") == title:
  82. return problem
  83. return None
  84. def _format_problem(self, problem: dict) -> str:
  85. examples_md = ""
  86. for i, ex in enumerate(problem["examples"], start=1):
  87. examples_md += f"""
  88. **Example {i}**
  89. Input: {ex["input"]}
  90. Output: {ex["output"]}
  91. """
  92. if ex["explanation"]:
  93. examples_md += f"Explanation: {ex['explanation']}\n"
  94. return f"""
  95. ### 推荐练习题:{problem['title']}
  96. **Difficulty:** {problem['difficulty']}
  97. **Tags:** {", ".join(problem['tags'])}
  98. ---
  99. ## 📘 题目描述
  100. {problem['description']}
  101. ---
  102. ## 🧪 示例
  103. {examples_md}
  104. ---
  105. ## 📌 约束条件
  106. {problem['constraints']}
  107. ---
  108. 💡 *请先尝试独立完成,不要直接查看题解。完成后可提交代码进行评审。*
  109. """