rss_digest.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from __future__ import annotations
  2. import importlib
  3. import io
  4. import sys
  5. from contextlib import redirect_stdout
  6. from datetime import datetime
  7. from pathlib import Path
  8. from time import perf_counter
  9. from typing import Any
  10. from backend.agents.base import BaseAgent
  11. from backend.config import settings
  12. from backend.events import event_logger
  13. from backend.maintenance import cleanup_rss_artifacts
  14. from backend.memory.base import memory_store
  15. from backend.models import AgentRequest, AgentResponse
  16. class RSSDigestAdapter(BaseAgent):
  17. """Expose rss_digest as one information/news platform agent."""
  18. def run(self, request: AgentRequest) -> AgentResponse:
  19. event_logger.emit("agent_started", agent_id=self.agent_id, task_id=request.task_id)
  20. try:
  21. output, artifacts = self._run_with_artifacts(request)
  22. except Exception as exc:
  23. output = f"资讯员运行失败:{type(exc).__name__}: {exc}"
  24. artifacts = {"error": str(exc), "error_type": type(exc).__name__}
  25. print(f"[rss_digest][error] {output}")
  26. memory_store.add(self.agent_id, f"input={request.input} output={output}")
  27. event = event_logger.emit(
  28. "agent_completed",
  29. agent_id=self.agent_id,
  30. task_id=request.task_id,
  31. payload={
  32. "output_preview": output[:200],
  33. "artifact_keys": sorted(artifacts.keys()),
  34. },
  35. )
  36. return AgentResponse(
  37. agent_id=self.agent_id,
  38. output=output,
  39. artifacts=artifacts,
  40. events=[event],
  41. )
  42. def _run(self, request: AgentRequest) -> str:
  43. output, _ = self._run_with_artifacts(request)
  44. return output
  45. def _run_with_artifacts(self, request: AgentRequest) -> tuple[str, dict[str, Any]]:
  46. root_dir = Path(settings.rss_digest_root).resolve()
  47. data_root = Path(settings.rss_digest_data_root).resolve()
  48. cleanup_stats = cleanup_rss_artifacts()
  49. print(f"[rss_digest] start {datetime.now().isoformat(timespec='seconds')} input={request.input[:80]}")
  50. if not root_dir.exists():
  51. message = f"rss_digest 项目路径不存在,无法运行资讯员:{root_dir}"
  52. print(f"[rss_digest][error] {message}")
  53. return message, {
  54. "ready": False,
  55. "rss_digest_root": str(root_dir),
  56. "rss_digest_data_root": str(data_root),
  57. "cleanup": cleanup_stats,
  58. }
  59. if request.context.get("mode") == "group_chat":
  60. digest_path = self._latest_digest_path(data_root)
  61. print("[rss_digest] skipped: group_chat guard")
  62. if digest_path:
  63. return (
  64. f"资讯员已就绪。最新 RSS 简报:{digest_path}",
  65. {
  66. "skipped": True,
  67. "reason": "batch_guard",
  68. "digest_path": str(digest_path),
  69. "rss_digest_data_root": str(data_root),
  70. "cleanup": cleanup_stats,
  71. },
  72. )
  73. return (
  74. "资讯员是较长耗时流程。请单独使用 @rss_digest 生成或更新 RSS 中文简报。",
  75. {"skipped": True, "reason": "batch_guard", "cleanup": cleanup_stats},
  76. )
  77. if request.context.get("dry_run"):
  78. print("[rss_digest] dry_run ok")
  79. return (
  80. "资讯员已接入 rss_digest,真实运行会拉取 RSS、生成中文摘要并输出 HTML 简报。",
  81. {
  82. "ready": True,
  83. "rss_digest_root": str(root_dir),
  84. "rss_digest_data_root": str(data_root),
  85. "cleanup": cleanup_stats,
  86. },
  87. )
  88. modules = self._load_rss_modules(root_dir)
  89. force_refresh = bool(request.context.get("force_refresh")) or self._is_force_refresh(request.input)
  90. today_digest_path = self._today_digest_path(data_root)
  91. if today_digest_path and not force_refresh:
  92. print("[rss_digest] skipped: today digest exists")
  93. recent_articles = self._recent_articles(root_dir, data_root, modules, limit=8)
  94. digest_url = self._digest_url(today_digest_path)
  95. run_stats = {
  96. "skipped": True,
  97. "reason": "today_digest_exists",
  98. "digest_article_count": len(recent_articles),
  99. "llm_enabled": True,
  100. }
  101. return self._format_output(today_digest_path, digest_url, recent_articles, run_stats), {
  102. "skipped": True,
  103. "reason": "today_digest_exists",
  104. "rss_digest_root": str(root_dir),
  105. "rss_digest_data_root": str(data_root),
  106. "digest_path": str(today_digest_path),
  107. "digest_url": digest_url,
  108. "recent_articles": recent_articles,
  109. "run_stats": run_stats,
  110. "cleanup": cleanup_stats,
  111. }
  112. stdout_buffer = io.StringIO()
  113. print("[rss_digest] running pipeline")
  114. started = perf_counter()
  115. with redirect_stdout(stdout_buffer):
  116. run_stats = modules["pipeline"].run_pipeline(root_dir, data_root)
  117. run_stats["adapter_total_seconds"] = round(perf_counter() - started, 3)
  118. print(
  119. "[rss_digest] complete "
  120. f"discovered={run_stats.get('discovered', 0)} "
  121. f"extracted={run_stats.get('extracted', 0)} "
  122. f"summarized={run_stats.get('summarized', 0)} "
  123. f"digest_articles={run_stats.get('digest_article_count', 0)} "
  124. f"seconds={run_stats.get('adapter_total_seconds')}"
  125. )
  126. digest_path = self._latest_digest_path(data_root)
  127. digest_url = self._digest_url(digest_path)
  128. recent_articles = self._recent_articles(root_dir, data_root, modules, limit=8)
  129. output = self._format_output(digest_path, digest_url, recent_articles, run_stats)
  130. artifacts = {
  131. "rss_digest_root": str(root_dir),
  132. "rss_digest_data_root": str(data_root),
  133. "digest_path": str(digest_path) if digest_path else None,
  134. "digest_url": digest_url,
  135. "recent_articles": recent_articles,
  136. "run_stats": run_stats,
  137. "stdout": stdout_buffer.getvalue().strip(),
  138. "cleanup": cleanup_stats,
  139. }
  140. return output, artifacts
  141. @staticmethod
  142. def _load_rss_modules(root_dir: Path) -> dict[str, Any]:
  143. src_dir = root_dir / "src"
  144. src_text = str(src_dir)
  145. if src_text not in sys.path:
  146. sys.path.insert(0, src_text)
  147. return {
  148. "pipeline": importlib.import_module("rss_digest.pipeline"),
  149. "config": importlib.import_module("rss_digest.config"),
  150. "db": importlib.import_module("rss_digest.db"),
  151. }
  152. @staticmethod
  153. def _latest_digest_path(data_root: Path) -> Path | None:
  154. digest_dir = data_root / "runs" / "digests"
  155. files = sorted(digest_dir.glob("digest_*.html"), key=lambda path: path.stat().st_mtime, reverse=True)
  156. return files[0] if files else None
  157. @staticmethod
  158. def _today_digest_path(data_root: Path) -> Path | None:
  159. digest_path = data_root / "runs" / "digests" / f"digest_{datetime.now().strftime('%Y-%m-%d')}.html"
  160. return digest_path if digest_path.exists() else None
  161. @staticmethod
  162. def _is_force_refresh(text: str) -> bool:
  163. normalized = text.lower()
  164. return any(token in normalized for token in ("强制", "重新生成", "刷新", "force", "refresh"))
  165. @staticmethod
  166. def _digest_url(digest_path: Path | None) -> str | None:
  167. if not digest_path:
  168. return None
  169. return f"/rss-digests/{digest_path.name}"
  170. @staticmethod
  171. def _recent_articles(root_dir: Path, data_root: Path, modules: dict[str, Any], limit: int) -> list[dict[str, Any]]:
  172. cfg = modules["config"].build_config(root_dir, data_root)
  173. conn = modules["db"].connect(cfg.db_path)
  174. modules["db"].init_db(conn)
  175. rows = modules["db"].get_recent_articles(conn, limit=limit)
  176. return [
  177. {
  178. "title": row.get("title", ""),
  179. "source_name": row.get("source_name", ""),
  180. "category": row.get("category", ""),
  181. "published_at": row.get("published_at", ""),
  182. "link": row.get("link", ""),
  183. "article_score": row.get("article_score"),
  184. "one_line": row.get("one_line"),
  185. "worth_reading": row.get("worth_reading"),
  186. }
  187. for row in rows
  188. ]
  189. @staticmethod
  190. def _format_output(
  191. digest_path: Path | None,
  192. digest_url: str | None,
  193. articles: list[dict[str, Any]],
  194. run_stats: dict[str, Any] | None,
  195. ) -> str:
  196. lines = ["资讯员已完成 RSS 更新和中文摘要生成。"]
  197. if run_stats:
  198. lines.append(
  199. "本轮统计:"
  200. f"RSS新增 {run_stats.get('discovered', 0)},"
  201. f"正文抽取 {run_stats.get('extracted', 0)},"
  202. f"LLM摘要 {run_stats.get('summarized', 0)},"
  203. f"本次简报文章 {run_stats.get('digest_article_count', 0)},"
  204. f"LLM启用 {run_stats.get('llm_enabled', False)}。"
  205. )
  206. if run_stats.get("no_new_articles"):
  207. lines.append("提示:本次没有新的未读文章进入简报,已避免重复展示今天看过的内容。")
  208. if run_stats.get("llm_enabled") and run_stats.get("summarized", 0) == 0:
  209. lines.append("提示:LLM 已配置,但本轮没有成功摘要新文章,可查看任务 artifacts 中的 stdout 和 run_stats。")
  210. if not run_stats.get("llm_enabled"):
  211. lines.append("提示:LLM 未启用,请检查 .env 中的 LLM_MODEL_ID、LLM_API_KEY、LLM_BASE_URL。")
  212. if digest_path:
  213. lines.append(f"最新 HTML 简报:{digest_path}")
  214. if digest_url:
  215. lines.append(f"点击打开:{digest_url}")
  216. if articles:
  217. lines.append("")
  218. lines.append("最新文章:")
  219. for index, article in enumerate(articles[:5], start=1):
  220. title = article.get("title") or "未命名文章"
  221. source = article.get("source_name") or "未知来源"
  222. score = article.get("article_score")
  223. score_text = f",评分 {score}" if score is not None else ""
  224. lines.append(f"{index}. {title},{source}{score_text}")
  225. one_line = article.get("one_line")
  226. if one_line:
  227. lines.append(f" {one_line}")
  228. return "\n".join(lines)