observability_views.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. """
  2. 阶段 3:从已落库 run 构建可观测性视图(timeline / 摘要),供 GET .../observability 使用。
  3. """
  4. from __future__ import annotations
  5. from typing import Any, Dict, List, Optional
  6. def build_diet_observability(row: Dict[str, Any]) -> Dict[str, Any]:
  7. """`get_diet_run` 返回的 row。"""
  8. out = row.get("output") or {}
  9. steps: List[Dict[str, Any]] = row.get("steps_trace") or []
  10. timeline: List[Dict[str, Any]] = []
  11. for s in steps:
  12. ph = s.get("phase")
  13. if ph == "tool_prefetch":
  14. tools = s.get("tools") or []
  15. timeline.append(
  16. {
  17. "phase": ph,
  18. "tool_calls": len(tools),
  19. "tools_ok": [bool(t.get("ok")) for t in tools],
  20. }
  21. )
  22. elif ph in ("nutritionist", "coach", "habit"):
  23. ats = s.get("attempts") or []
  24. timeline.append(
  25. {
  26. "phase": ph,
  27. "fallback_used": bool(s.get("fallback_used")),
  28. "llm_attempts": len(ats),
  29. "last_attempt_ok": any(a.get("ok") for a in ats) if ats else False,
  30. }
  31. )
  32. else:
  33. timeline.append({"phase": ph or "unknown", "raw_keys": list(s.keys())})
  34. mp = out.get("meal_plan") or {}
  35. items = mp.get("items") or []
  36. return {
  37. "kind": "diet",
  38. "run_id": row.get("run_id"),
  39. "user_id": row.get("user_id"),
  40. "created_at": row.get("created_at"),
  41. "replayed_from_run_id": row.get("replayed_from_run_id")
  42. or out.get("replayed_from"),
  43. "schema_version": out.get("schema_version"),
  44. "pipeline_mode": out.get("pipeline_mode"),
  45. "degraded": out.get("degraded"),
  46. "errors": out.get("errors") or [],
  47. "rag_debug": out.get("rag_debug") or {},
  48. "trace_timeline": timeline,
  49. "input_snapshot": row.get("input"),
  50. "meal_plan_item_count": len(items),
  51. "estimated_total_protein_g": mp.get("total_est_protein_g"),
  52. "replay": {
  53. "supported": True,
  54. "method": "POST",
  55. "path_template": "/api/diet/runs/{run_id}/replay",
  56. "note": "使用同一份 input 重新跑流水线,生成新 run_id;Mock 工具确定性较高,LLM 输出仍可能不同。",
  57. },
  58. }
  59. def build_report_observability(
  60. row: Dict[str, Any], *, include_raw_trace: bool = False
  61. ) -> Dict[str, Any]:
  62. """`get_report_run` 返回的 row。默认只返回摘要,避免 trace 过大。"""
  63. trace = row.get("agent_trace")
  64. summary: Dict[str, Any] = {}
  65. if isinstance(trace, dict):
  66. for agent_name, events in trace.items():
  67. if isinstance(events, list):
  68. summary[agent_name] = {
  69. "event_count": len(events),
  70. "last_titles": [e.get("title") for e in events[-5:] if isinstance(e, dict)],
  71. }
  72. else:
  73. summary[agent_name] = {"event_count": 0}
  74. out: Dict[str, Any] = {
  75. "kind": "health_report",
  76. "task_id": row.get("task_id"),
  77. "user_id": row.get("user_id"),
  78. "created_at": row.get("created_at"),
  79. "summary_text_preview": (row.get("summary_text") or "")[:240] or None,
  80. "has_agent_trace": bool(trace),
  81. "agent_trace_summary": summary,
  82. }
  83. if include_raw_trace:
  84. out["agent_trace"] = trace
  85. return out