indexers.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """
  2. 将 SQLite 中的历史记录写入 Milvus 向量索引。
  3. """
  4. from __future__ import annotations
  5. import hashlib
  6. from typing import Any, Dict, List, Optional
  7. from memory.store import (
  8. get_diet_reflect,
  9. get_diet_run,
  10. get_report_run,
  11. list_user_memory_chunks_sql,
  12. )
  13. from rag.embedding import embed_texts
  14. from rag.milvus_store import upsert_chunks
  15. def _chunk_id(source_type: str, source_id: str, text: str) -> str:
  16. h = hashlib.sha1(text.encode("utf-8")).hexdigest()[:16]
  17. return f"{source_type}:{source_id}:{h}"
  18. def _to_chunk(
  19. user_id: str,
  20. source_type: str,
  21. source_id: str,
  22. text: str,
  23. created_at: str | None = None,
  24. ) -> Dict[str, Any]:
  25. return {
  26. "chunk_id": _chunk_id(source_type, source_id, text),
  27. "user_id": user_id,
  28. "source_type": source_type,
  29. "source_id": source_id,
  30. "text": text[:8000],
  31. "created_at": created_at or "",
  32. }
  33. def _embed_and_upsert(chunks: List[Dict[str, Any]]) -> int:
  34. if not chunks:
  35. return 0
  36. vecs = embed_texts([c["text"] for c in chunks])
  37. for c, v in zip(chunks, vecs):
  38. c["vector"] = v
  39. return upsert_chunks(chunks)
  40. def index_report_run(task_id: str) -> int:
  41. row = get_report_run(task_id)
  42. if not row:
  43. return 0
  44. txt = row.get("summary_text") or ""
  45. if not txt:
  46. report = row.get("report") or {}
  47. report_in = report.get("report") if isinstance(report, dict) else {}
  48. txt = (report_in or {}).get("summary") or ""
  49. if not txt:
  50. return 0
  51. chunk = _to_chunk(
  52. user_id=row["user_id"],
  53. source_type="report_summary",
  54. source_id=row["task_id"],
  55. text=txt,
  56. created_at=row.get("created_at"),
  57. )
  58. return _embed_and_upsert([chunk])
  59. def index_diet_run(run_id: str) -> int:
  60. row = get_diet_run(run_id)
  61. if not row:
  62. return 0
  63. output = row.get("output") or {}
  64. mp = (output.get("meal_plan") or {}) if isinstance(output, dict) else {}
  65. hints = (output.get("habit_extras") or {}).get("execution_hints", [])
  66. items = mp.get("items") or []
  67. txt = ";".join(
  68. [f"{it.get('name')} {it.get('portion')} {it.get('why','')}" for it in items if isinstance(it, dict)]
  69. )
  70. if hints:
  71. txt += "\n执行提示:" + ";".join([str(x) for x in hints])
  72. if not txt:
  73. txt = str(output)[:2000]
  74. chunk = _to_chunk(
  75. user_id=row["user_id"],
  76. source_type="diet_plan",
  77. source_id=row["run_id"],
  78. text=txt,
  79. created_at=row.get("created_at"),
  80. )
  81. return _embed_and_upsert([chunk])
  82. def index_reflect_event(reflect_id: int | str) -> int:
  83. row = get_diet_reflect(int(reflect_id))
  84. if not row:
  85. return 0
  86. txt = f"执行={row['followed']} 原因={row.get('reason_code') or '-'} 说明={row.get('reason_detail') or ''}"
  87. chunk = _to_chunk(
  88. user_id=row["user_id"],
  89. source_type="diet_reflect",
  90. source_id=str(row["id"]),
  91. text=txt,
  92. created_at=row.get("created_at"),
  93. )
  94. return _embed_and_upsert([chunk])
  95. def reindex_user(user_id: str, limit: int = 200) -> int:
  96. rows = list_user_memory_chunks_sql(user_id=user_id, limit=limit)
  97. chunks = [
  98. _to_chunk(
  99. user_id=r["user_id"],
  100. source_type=r["source_type"],
  101. source_id=r["source_id"],
  102. text=r["text"],
  103. created_at=r.get("created_at"),
  104. )
  105. for r in rows
  106. if r.get("text")
  107. ]
  108. return _embed_and_upsert(chunks)