"""FastAPI entrypoint exposing the DeepResearchAgent via HTTP.""" from __future__ import annotations import json import sys from typing import Any, Dict, Iterator, Optional from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from loguru import logger from pydantic import BaseModel, Field from config import Configuration, SearchAPI from agent import DeepResearchAgent # 添加控制台日志处理程序 logger.add( sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <4} | using_function:{function} | {file}:{line} | {message}", colorize=True, ) # 添加错误日志文件处理程序 logger.add( sink=sys.stderr, level="ERROR", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <4} | using_function:{function} | {file}:{line} | {message}", colorize=True, ) class ResearchRequest(BaseModel): """Payload for triggering a research run.""" topic: str = Field(..., description="Research topic supplied by the user") search_api: SearchAPI | None = Field( default=None, description="Override the default search backend configured via env", ) class ResearchResponse(BaseModel): """HTTP response containing the generated report and structured tasks.""" report_markdown: str = Field( ..., description="Markdown-formatted research report including sections" ) todo_items: list[dict[str, Any]] = Field( default_factory=list, description="Structured TODO items with summaries and sources", ) def _mask_secret(value: Optional[str], visible: int = 4) -> str: """Mask sensitive tokens while keeping leading and trailing characters.""" if not value: return "unset" if len(value) <= visible * 2: return "*" * len(value) return f"{value[:visible]}...{value[-visible:]}" def _build_config(payload: ResearchRequest) -> Configuration: overrides: Dict[str, Any] = {} if payload.search_api is not None: overrides["search_api"] = payload.search_api return Configuration.from_env(overrides=overrides) def create_app() -> FastAPI: app = FastAPI(title="HelloAgents Deep Researcher") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") def log_startup_configuration() -> None: config = Configuration.from_env() if config.llm_provider == "ollama": base_url = config.sanitized_ollama_url() elif config.llm_provider == "lmstudio": base_url = config.lmstudio_base_url else: base_url = config.llm_base_url or "unset" logger.info( "DeepResearch configuration loaded: provider=%s model=%s base_url=%s search_api=%s " "max_loops=%s fetch_full_page=%s tool_calling=%s strip_thinking=%s api_key=%s", config.llm_provider, config.resolved_model() or "unset", base_url, (config.search_api.value if isinstance(config.search_api, SearchAPI) else config.search_api), config.max_web_research_loops, config.fetch_full_page, config.use_tool_calling, config.strip_thinking_tokens, _mask_secret(config.llm_api_key), ) @app.get("/healthz") def health_check() -> Dict[str, str]: return {"status": "ok"} @app.post("/research", response_model=ResearchResponse) def run_research(payload: ResearchRequest) -> ResearchResponse: try: config = _build_config(payload) agent = DeepResearchAgent(config=config) result = agent.run(payload.topic) except ValueError as exc: # Likely due to unsupported configuration raise HTTPException(status_code=400, detail=str(exc)) from exc except Exception as exc: # pragma: no cover - defensive guardrail raise HTTPException(status_code=500, detail="Research failed") from exc todo_payload = [ { "id": item.id, "title": item.title, "intent": item.intent, "query": item.query, "status": item.status, "summary": item.summary, "sources_summary": item.sources_summary, "note_id": item.note_id, "note_path": item.note_path, } for item in result.todo_items ] return ResearchResponse( report_markdown=(result.report_markdown or result.running_summary or ""), todo_items=todo_payload, ) @app.post("/research/stream") def stream_research(payload: ResearchRequest) -> StreamingResponse: try: config = _build_config(payload) agent = DeepResearchAgent(config=config) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc def event_iterator() -> Iterator[str]: try: for event in agent.run_stream(payload.topic): yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" except Exception as exc: # pragma: no cover - defensive guardrail logger.exception("Streaming research failed") error_payload = {"type": "error", "detail": str(exc)} yield f"data: {json.dumps(error_payload, ensure_ascii=False)}\n\n" return StreamingResponse( event_iterator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", }, ) return app app = create_app() if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True, log_level="info" )