"""FastAPI entrypoint exposing the DeepResearchAgent via HTTP."""
from __future__ import annotations
import json
import sys
from typing import Any, Dict, Iterator, Optional
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from loguru import logger
from pydantic import BaseModel, Field
from config import Configuration, SearchAPI
from agent import DeepResearchAgent
# 添加控制台日志处理程序
logger.add(
sys.stderr,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <4} | using_function:{function} | {file}:{line} | {message}",
colorize=True,
)
# 添加错误日志文件处理程序
logger.add(
sink=sys.stderr,
level="ERROR",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <4} | using_function:{function} | {file}:{line} | {message}",
colorize=True,
)
class ResearchRequest(BaseModel):
"""Payload for triggering a research run."""
topic: str = Field(..., description="Research topic supplied by the user")
search_api: SearchAPI | None = Field(
default=None,
description="Override the default search backend configured via env",
)
class ResearchResponse(BaseModel):
"""HTTP response containing the generated report and structured tasks."""
report_markdown: str = Field(
..., description="Markdown-formatted research report including sections"
)
todo_items: list[dict[str, Any]] = Field(
default_factory=list,
description="Structured TODO items with summaries and sources",
)
def _mask_secret(value: Optional[str], visible: int = 4) -> str:
"""Mask sensitive tokens while keeping leading and trailing characters."""
if not value:
return "unset"
if len(value) <= visible * 2:
return "*" * len(value)
return f"{value[:visible]}...{value[-visible:]}"
def _build_config(payload: ResearchRequest) -> Configuration:
overrides: Dict[str, Any] = {}
if payload.search_api is not None:
overrides["search_api"] = payload.search_api
return Configuration.from_env(overrides=overrides)
def create_app() -> FastAPI:
app = FastAPI(title="HelloAgents Deep Researcher")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
def log_startup_configuration() -> None:
config = Configuration.from_env()
if config.llm_provider == "ollama":
base_url = config.sanitized_ollama_url()
elif config.llm_provider == "lmstudio":
base_url = config.lmstudio_base_url
else:
base_url = config.llm_base_url or "unset"
logger.info(
"DeepResearch configuration loaded: provider=%s model=%s base_url=%s search_api=%s "
"max_loops=%s fetch_full_page=%s tool_calling=%s strip_thinking=%s api_key=%s",
config.llm_provider,
config.resolved_model() or "unset",
base_url,
(config.search_api.value if isinstance(config.search_api, SearchAPI) else config.search_api),
config.max_web_research_loops,
config.fetch_full_page,
config.use_tool_calling,
config.strip_thinking_tokens,
_mask_secret(config.llm_api_key),
)
@app.get("/healthz")
def health_check() -> Dict[str, str]:
return {"status": "ok"}
@app.post("/research", response_model=ResearchResponse)
def run_research(payload: ResearchRequest) -> ResearchResponse:
try:
config = _build_config(payload)
agent = DeepResearchAgent(config=config)
result = agent.run(payload.topic)
except ValueError as exc: # Likely due to unsupported configuration
raise HTTPException(status_code=400, detail=str(exc)) from exc
except Exception as exc: # pragma: no cover - defensive guardrail
raise HTTPException(status_code=500, detail="Research failed") from exc
todo_payload = [
{
"id": item.id,
"title": item.title,
"intent": item.intent,
"query": item.query,
"status": item.status,
"summary": item.summary,
"sources_summary": item.sources_summary,
"note_id": item.note_id,
"note_path": item.note_path,
}
for item in result.todo_items
]
return ResearchResponse(
report_markdown=(result.report_markdown or result.running_summary or ""),
todo_items=todo_payload,
)
@app.post("/research/stream")
def stream_research(payload: ResearchRequest) -> StreamingResponse:
try:
config = _build_config(payload)
agent = DeepResearchAgent(config=config)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
def event_iterator() -> Iterator[str]:
try:
for event in agent.run_stream(payload.topic):
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
except Exception as exc: # pragma: no cover - defensive guardrail
logger.exception("Streaming research failed")
error_payload = {"type": "error", "detail": str(exc)}
yield f"data: {json.dumps(error_payload, ensure_ascii=False)}\n\n"
return StreamingResponse(
event_iterator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
return app
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)