agent_api.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. Agent分析流式 API — 舆情分析、数据分析流式接口
  3. 实现方案路径别名见 sentiment.py、data_analysis.py(/sentiment/analyze/stream 等)。
  4. """
  5. import json
  6. import threading
  7. from fastapi import APIRouter
  8. from fastapi.responses import StreamingResponse
  9. from pydantic import BaseModel, Field
  10. from app.services import history_service
  11. router = APIRouter(prefix="/agent", tags=["Agent分析"])
  12. class AnalysisRequest(BaseModel):
  13. stock_code: str = Field(..., description="股票代码", min_length=6, max_length=6)
  14. stock_name: str = Field("", description="股票名称")
  15. def _save_to_history(analysis_type, content, stock_code, stock_name, title):
  16. """在后台线程中保存分析历史"""
  17. import asyncio
  18. try:
  19. loop = asyncio.new_event_loop()
  20. asyncio.set_event_loop(loop)
  21. loop.run_until_complete(
  22. history_service.save_analysis(
  23. analysis_type=analysis_type,
  24. content=content,
  25. stock_code=stock_code,
  26. stock_name=stock_name,
  27. title=title,
  28. )
  29. )
  30. loop.close()
  31. except Exception:
  32. pass
  33. def _delta_text(ev: dict) -> str:
  34. return (ev.get("content") or ev.get("text") or "") if isinstance(ev, dict) else ""
  35. def iter_sentiment_analysis_ndjson(stock_code: str, stock_name: str):
  36. """生成舆情分析 NDJSON 行(字符串迭代器,含尾部换行)。"""
  37. collected_content = []
  38. try:
  39. from agents.agent_system import get_agent_system
  40. asys = get_agent_system()
  41. for event in asys.analyze_sentiment_stream(stock_code, stock_name):
  42. if event.get("type") == "delta":
  43. collected_content.append(_delta_text(event))
  44. yield json.dumps(event, ensure_ascii=False) + "\n"
  45. full_content = "".join(collected_content)
  46. if full_content:
  47. threading.Thread(
  48. target=_save_to_history,
  49. args=(
  50. "sentiment",
  51. full_content,
  52. stock_code,
  53. stock_name,
  54. f"{stock_name or stock_code} 舆情分析",
  55. ),
  56. daemon=True,
  57. ).start()
  58. except Exception as e:
  59. yield json.dumps({"type": "error", "content": f"舆情分析服务错误: {e}"}, ensure_ascii=False) + "\n"
  60. def iter_data_analysis_ndjson(stock_code: str, stock_name: str):
  61. """生成数据分析 NDJSON 行。"""
  62. collected_content = []
  63. try:
  64. from agents.agent_system import get_agent_system
  65. asys = get_agent_system()
  66. for event in asys.analyze_data_stream(stock_code, stock_name):
  67. if event.get("type") == "delta":
  68. collected_content.append(_delta_text(event))
  69. yield json.dumps(event, ensure_ascii=False) + "\n"
  70. full_content = "".join(collected_content)
  71. if full_content:
  72. threading.Thread(
  73. target=_save_to_history,
  74. args=(
  75. "data_analysis",
  76. full_content,
  77. stock_code,
  78. stock_name,
  79. f"{stock_name or stock_code} 数据分析",
  80. ),
  81. daemon=True,
  82. ).start()
  83. except Exception as e:
  84. yield json.dumps({"type": "error", "content": f"数据分析服务错误: {e}"}, ensure_ascii=False) + "\n"
  85. @router.post("/sentiment/stream")
  86. async def sentiment_stream(body: AnalysisRequest):
  87. """AI舆情分析流式接口(兼容路径)"""
  88. return StreamingResponse(
  89. iter_sentiment_analysis_ndjson(body.stock_code, body.stock_name),
  90. media_type="application/x-ndjson",
  91. headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
  92. )
  93. @router.post("/data-analysis/stream")
  94. async def data_analysis_stream(body: AnalysisRequest):
  95. """AI数据分析流式接口(兼容路径)"""
  96. return StreamingResponse(
  97. iter_data_analysis_ndjson(body.stock_code, body.stock_name),
  98. media_type="application/x-ndjson",
  99. headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
  100. )