maintenance.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from __future__ import annotations
  2. import shutil
  3. from datetime import datetime, timedelta
  4. from pathlib import Path
  5. from typing import Any
  6. from backend.config import ROOT_DIR, settings
  7. _last_cleanup: dict[str, datetime] = {}
  8. def cleanup_deep_research_artifacts(*, force: bool = False) -> dict[str, Any]:
  9. """Remove old deep research run artifacts.
  10. This intentionally does not delete notes. Notes are indexed memory artifacts,
  11. while runs are reproducible per-execution files that can grow quickly.
  12. """
  13. if not _should_run("deep_research", force=force):
  14. return {"skipped": True, "reason": "interval"}
  15. run_root = _resolve_workspace(settings.run_workspace)
  16. stats = _cleanup_children(
  17. run_root,
  18. retention_days=settings.research_run_retention_days,
  19. file_patterns=["*"],
  20. delete_dirs=True,
  21. )
  22. stats.update(
  23. {
  24. "skipped": False,
  25. "target": str(run_root),
  26. "retention_days": settings.research_run_retention_days,
  27. }
  28. )
  29. return stats
  30. def cleanup_rss_artifacts(*, force: bool = False) -> dict[str, Any]:
  31. """Remove old RSS generated files while keeping article state intact."""
  32. if not _should_run("rss_digest", force=force):
  33. return {"skipped": True, "reason": "interval"}
  34. data_root = Path(settings.rss_digest_data_root).resolve() / "runs"
  35. totals = {"deleted_files": 0, "deleted_dirs": 0, "deleted_bytes": 0}
  36. for relative, retention_days, patterns in (
  37. ("digests", settings.rss_digest_retention_days, ["digest_*.html"]),
  38. ("raw", settings.rss_cache_retention_days, ["*"]),
  39. ("extracted", settings.rss_cache_retention_days, ["*"]),
  40. ("translated", settings.rss_cache_retention_days, ["*"]),
  41. ):
  42. stats = _cleanup_children(
  43. data_root / relative,
  44. retention_days=retention_days,
  45. file_patterns=patterns,
  46. delete_dirs=False,
  47. )
  48. for key in totals:
  49. totals[key] += stats[key]
  50. totals.update(
  51. {
  52. "skipped": False,
  53. "target": str(data_root),
  54. "digest_retention_days": settings.rss_digest_retention_days,
  55. "cache_retention_days": settings.rss_cache_retention_days,
  56. }
  57. )
  58. return totals
  59. def _should_run(name: str, *, force: bool) -> bool:
  60. if not settings.maintenance_cleanup_enabled:
  61. return False
  62. now = datetime.now()
  63. last_run = _last_cleanup.get(name)
  64. interval = timedelta(hours=max(settings.maintenance_cleanup_interval_hours, 1))
  65. if not force and last_run and now - last_run < interval:
  66. return False
  67. _last_cleanup[name] = now
  68. return True
  69. def _resolve_workspace(value: str) -> Path:
  70. path = Path(value)
  71. if not path.is_absolute():
  72. path = ROOT_DIR / path
  73. path.mkdir(parents=True, exist_ok=True)
  74. return path.resolve()
  75. def _cleanup_children(
  76. root: Path,
  77. *,
  78. retention_days: int,
  79. file_patterns: list[str],
  80. delete_dirs: bool,
  81. ) -> dict[str, int]:
  82. stats = {"deleted_files": 0, "deleted_dirs": 0, "deleted_bytes": 0}
  83. if retention_days <= 0 or not root.exists():
  84. return stats
  85. root = root.resolve()
  86. cutoff = datetime.now() - timedelta(days=retention_days)
  87. if delete_dirs:
  88. for child in root.iterdir():
  89. if not child.exists() or not _is_child_of(child, root):
  90. continue
  91. if datetime.fromtimestamp(child.stat().st_mtime) >= cutoff:
  92. continue
  93. if child.is_dir():
  94. stats["deleted_bytes"] += _directory_size(child)
  95. shutil.rmtree(child)
  96. stats["deleted_dirs"] += 1
  97. elif child.is_file():
  98. stats["deleted_bytes"] += child.stat().st_size
  99. child.unlink()
  100. stats["deleted_files"] += 1
  101. return stats
  102. for pattern in file_patterns:
  103. for path in root.glob(pattern):
  104. if not path.is_file() or not _is_child_of(path, root):
  105. continue
  106. if datetime.fromtimestamp(path.stat().st_mtime) >= cutoff:
  107. continue
  108. stats["deleted_bytes"] += path.stat().st_size
  109. path.unlink()
  110. stats["deleted_files"] += 1
  111. return stats
  112. def _is_child_of(path: Path, root: Path) -> bool:
  113. try:
  114. path.resolve().relative_to(root.resolve())
  115. return True
  116. except ValueError:
  117. return False
  118. def _directory_size(path: Path) -> int:
  119. return sum(item.stat().st_size for item in path.rglob("*") if item.is_file())