fancy hace 8 meses
padre
commit
10a45a165b

+ 11 - 20
code/chapter14/helloagents-deepresearch/backend/src/deep_researcher/app/agents/deep_research_agent.py

@@ -7,7 +7,7 @@ import re
 from pathlib import Path
 from queue import Empty, Queue
 from threading import Lock, Thread
-from typing import Any, Callable, Iterator, Optional
+from typing import Any, Callable, Iterator
 
 from hello_agents import HelloAgentsLLM
 from hello_agents.tools import ToolRegistry
@@ -34,6 +34,7 @@ class DeepResearchAgent:
     """Coordinator orchestrating TODO-based research workflow using HelloAgents."""
 
     def __init__(self, config: Configuration | None = None) -> None:
+        """Initialise the coordinator with configuration and shared tools."""
         self.config = config or Configuration.from_env()
         self.llm = self._init_llm()
 
@@ -78,7 +79,6 @@ class DeepResearchAgent:
     # ------------------------------------------------------------------
     def _init_llm(self) -> HelloAgentsLLM:
         """Instantiate HelloAgentsLLM following configuration preferences."""
-
         llm_kwargs: dict[str, Any] = {"temperature": 0.0}
 
         model_id = self.config.llm_model_id or self.config.local_llm
@@ -109,7 +109,6 @@ class DeepResearchAgent:
 
     def _create_tool_aware_agent(self, *, name: str, system_prompt: str) -> ToolAwareSimpleAgent:
         """Instantiate a ToolAwareSimpleAgent sharing tool registry and tracker."""
-
         return ToolAwareSimpleAgent(
             name=name,
             llm=self.llm,
@@ -119,15 +118,13 @@ class DeepResearchAgent:
             tool_call_listener=self._tool_tracker.record,
         )
 
-    def _set_tool_event_sink(self, sink: Optional[Callable[[dict[str, Any]], None]]) -> None:
+    def _set_tool_event_sink(self, sink: Callable[[dict[str, Any]], None] | None) -> None:
         """Enable or disable immediate tool event callbacks."""
-
         self._tool_event_sink_enabled = sink is not None
         self._tool_tracker.set_event_sink(sink)
 
     def run(self, topic: str) -> SummaryStateOutput:
         """Execute the research workflow and return the final report."""
-
         state = SummaryState(research_topic=topic)
         state.todo_items = self.planner.plan_todo_list(state)
         self._drain_tool_events(state)
@@ -153,7 +150,6 @@ class DeepResearchAgent:
 
     def run_stream(self, topic: str) -> Iterator[dict[str, Any]]:
         """Execute the workflow yielding incremental progress events."""
-
         state = SummaryState(research_topic=topic)
         logger.debug("Starting streaming research: topic=%s", topic)
         yield {"type": "status", "message": "初始化研究流程"}
@@ -181,8 +177,8 @@ class DeepResearchAgent:
         def enqueue(
             event: dict[str, Any],
             *,
-            task: Optional[TodoItem] = None,
-            step_override: Optional[int] = None,
+            task: TodoItem | None = None,
+            step_override: int | None = None,
         ) -> None:
             payload = dict(event)
             target_task_id = payload.get("task_id")
@@ -300,7 +296,6 @@ class DeepResearchAgent:
         step: int | None = None,
     ) -> Iterator[dict[str, Any]]:
         """Run search + summarization for a single task."""
-
         task.status = "in_progress"
 
         search_result, notices, answer_text, backend = dispatch_search(
@@ -362,7 +357,7 @@ class DeepResearchAgent:
             state.sources_gathered.append(sources_summary)
             state.research_loop_count += 1
 
-        summary_text: Optional[str] = None
+        summary_text: str | None = None
 
         if emit_stream:
             for event in self._drain_tool_events(state, step=step):
@@ -422,10 +417,9 @@ class DeepResearchAgent:
         self,
         state: SummaryState,
         *,
-        step: Optional[int] = None,
+        step: int | None = None,
     ) -> list[dict[str, Any]]:
         """Proxy to the shared tool call tracker."""
-
         events = self._tool_tracker.drain(state, step=step)
         if self._tool_event_sink_enabled:
             return []
@@ -434,12 +428,10 @@ class DeepResearchAgent:
     @property
     def _tool_call_events(self) -> list[dict[str, Any]]:
         """Expose recorded tool events for legacy integrations."""
-
         return self._tool_tracker.as_dicts()
 
     def _serialize_task(self, task: TodoItem) -> dict[str, Any]:
         """Convert task dataclass to serializable dict for frontend."""
-
         return {
             "id": task.id,
             "title": task.title,
@@ -453,7 +445,7 @@ class DeepResearchAgent:
             "stream_token": task.stream_token,
         }
 
-    def _persist_final_report(self, state: SummaryState, report: str) -> Optional[dict[str, Any]]:
+    def _persist_final_report(self, state: SummaryState, report: str) -> dict[str, Any] | None:
         if not self.note_tool or not report or not report.strip():
             return None
 
@@ -511,7 +503,7 @@ class DeepResearchAgent:
 
         return payload
 
-    def _find_existing_report_note_id(self, state: SummaryState) -> Optional[str]:
+    def _find_existing_report_note_id(self, state: SummaryState) -> str | None:
         if state.report_note_id:
             return state.report_note_id
 
@@ -543,7 +535,7 @@ class DeepResearchAgent:
         return None
 
     @staticmethod
-    def _extract_note_id_from_text(response: str) -> Optional[str]:
+    def _extract_note_id_from_text(response: str) -> str | None:
         if not response:
             return None
 
@@ -554,8 +546,7 @@ class DeepResearchAgent:
         return match.group(1).strip()
 
 
-def run_deep_research(topic: str, config: Optional[Configuration] = None) -> SummaryStateOutput:
+def run_deep_research(topic: str, config: Configuration | None = None) -> SummaryStateOutput:
     """Convenience function mirroring the class-based API."""
-
     agent = DeepResearchAgent(config=config)
     return agent.run(topic)

+ 40 - 57
code/chapter14/helloagents-deepresearch/backend/src/deep_researcher/app/services/search_service.py

@@ -1,25 +1,23 @@
-"""Search dispatching helpers."""
+"""Search dispatch helpers leveraging HelloAgents SearchTool."""
 
 from __future__ import annotations
 
 import logging
 from typing import Any, Optional, Tuple
 
+from hello_agents.tools import SearchTool
+
 from ...configuration import Configuration
 from ...utils import (
-    advanced_search,
     deduplicate_and_format_sources,
-    duckduckgo_search,
     format_sources,
     get_config_value,
-    perplexity_search,
-    searxng_search,
-    tavily_search,
 )
 
 logger = logging.getLogger(__name__)
 
 MAX_TOKENS_PER_SOURCE = 2000
+_GLOBAL_SEARCH_TOOL = SearchTool(backend="hybrid")
 
 
 def dispatch_search(
@@ -27,70 +25,56 @@ def dispatch_search(
     config: Configuration,
     loop_count: int,
 ) -> Tuple[dict[str, Any] | None, list[str], Optional[str], str]:
-    """Call the configured search backend and normalize the response."""
+    """Execute configured search backend and normalise response payload."""
 
     search_api = get_config_value(config.search_api)
-    notices: list[str] = []
-    answer_text: Optional[str] = None
-    backend_label = search_api
-
-    if search_api == "tavily":
-        result = tavily_search(
-            query,
-            fetch_full_page=config.fetch_full_page,
-            max_results=5,
-        )
-    elif search_api == "perplexity":
-        result = perplexity_search(
-            query,
-            perplexity_search_loop_count=loop_count,
-        )
-    elif search_api == "duckduckgo":
-        result = duckduckgo_search(
-            query,
-            max_results=5,
-            fetch_full_page=config.fetch_full_page,
-        )
-    elif search_api == "searxng":
-        result = searxng_search(
-            query,
-            max_results=5,
-            fetch_full_page=config.fetch_full_page,
-        )
-    elif search_api == "advanced":
-        result = advanced_search(
-            query,
-            fetch_full_page=config.fetch_full_page,
+
+    try:
+        raw_response = _GLOBAL_SEARCH_TOOL.run(
+            {
+                "input": query,
+                "backend": search_api,
+                "mode": "structured",
+                "fetch_full_page": config.fetch_full_page,
+                "max_results": 5,
+                "max_tokens_per_source": MAX_TOKENS_PER_SOURCE,
+                "loop_count": loop_count,
+            }
         )
-        if isinstance(result, dict):
-            notices = list(result.get("notices") or [])
-            answer_text = result.get("answer")
-            backend_label = str(result.get("backend") or "advanced")
+    except Exception as exc:  # pragma: no cover - defensive logging
+        logger.exception("Search backend %s failed: %s", search_api, exc)
+        raise
+
+    if isinstance(raw_response, str):
+        notices = [raw_response]
+        logger.warning("Search backend %s returned text notice: %s", search_api, raw_response)
+        payload: dict[str, Any] = {
+            "results": [],
+            "backend": search_api,
+            "answer": None,
+            "notices": notices,
+        }
     else:
-        raise ValueError(f"Unsupported search API: {config.search_api}")
-
-    if answer_text is None and isinstance(result, dict):
-        answer_text = result.get("answer")
+        payload = raw_response
+        notices = list(payload.get("notices") or [])
 
-    if isinstance(result, dict):
-        results_len = len(result.get("results", []))
-    elif isinstance(result, list):
-        results_len = len(result)
-    else:
-        results_len = "?"
+    backend_label = str(payload.get("backend") or search_api)
+    answer_text = payload.get("answer")
+    results = payload.get("results", [])
 
     if notices:
         for notice in notices:
             logger.info("Search notice (%s): %s", backend_label, notice)
+
     logger.info(
         "Search backend=%s resolved_backend=%s answer=%s results=%s",
         search_api,
         backend_label,
         bool(answer_text),
-        results_len,
+        len(results),
     )
 
-    return result, notices, answer_text, backend_label
+    return payload, notices, answer_text, backend_label
 
 
 def prepare_research_context(
@@ -98,11 +82,11 @@ def prepare_research_context(
     answer_text: Optional[str],
     config: Configuration,
 ) -> tuple[str, str]:
-    """Format sources and research context for downstream summarization."""
+    """Build structured context and source summary for downstream agents."""
 
     sources_summary = format_sources(search_result)
     context = deduplicate_and_format_sources(
-        search_result,
+        search_result or {"results": []},
         max_tokens_per_source=MAX_TOKENS_PER_SOURCE,
         fetch_full_page=config.fetch_full_page,
     )
@@ -111,4 +95,3 @@ def prepare_research_context(
         context = f"AI直接答案:\n{answer_text}\n\n{context}"
 
     return sources_summary, context
-

+ 43 - 496
code/chapter14/helloagents-deepresearch/backend/src/deep_researcher/utils.py

@@ -1,53 +1,24 @@
-import os
-import logging
-import httpx
-import requests
-from typing import Dict, Any, List, Union, Optional
-
-from markdownify import markdownify
-from langsmith import traceable
-from tavily import TavilyClient
-from ddgs import DDGS
-from ddgs.exceptions import DDGSException
+"""Utility helpers shared across deep researcher services."""
 
+from __future__ import annotations
 
-logger = logging.getLogger(__name__)
+import logging
+from typing import Any, Dict, List, Union
 
-# Constants
 CHARS_PER_TOKEN = 4
 
+logger = logging.getLogger(__name__)
 
-def get_config_value(value: Any) -> str:
-    """
-    Convert configuration values to string format, handling both string and enum types.
-
-    Args:
-        value (Any): The configuration value to process. Can be a string or an Enum.
 
-    Returns:
-        str: The string representation of the value.
+def get_config_value(value: Any) -> str:
+    """Return configuration value as plain string."""
 
-    Examples:
-        >>> get_config_value("tavily")
-        'tavily'
-        >>> get_config_value(SearchAPI.TAVILY)
-        'tavily'
-    """
     return value if isinstance(value, str) else value.value
 
 
 def strip_thinking_tokens(text: str) -> str:
-    """
-    Remove <think> and </think> tags and their content from the text.
-
-    Iteratively removes all occurrences of content enclosed in thinking tokens.
+    """Remove ``<think>`` sections from model responses."""
 
-    Args:
-        text (str): The text to process
-
-    Returns:
-        str: The text with thinking tokens and their content removed
-    """
     while "<think>" in text and "</think>" in text:
         start = text.find("<think>")
         end = text.find("</think>") + len("</think>")
@@ -56,482 +27,58 @@ def strip_thinking_tokens(text: str) -> str:
 
 
 def deduplicate_and_format_sources(
-    search_response: Union[Dict[str, Any], List[Dict[str, Any]]],
+    search_response: Dict[str, Any] | List[Dict[str, Any]],
     max_tokens_per_source: int,
+    *,
     fetch_full_page: bool = False,
 ) -> str:
-    """
-    Format and deduplicate search responses from various search APIs.
-
-    Takes either a single search response or list of responses from search APIs,
-    deduplicates them by URL, and formats them into a structured string.
+    """Format and deduplicate search results for downstream prompting."""
 
-    Args:
-        search_response (Union[Dict[str, Any], List[Dict[str, Any]]]): Either:
-            - A dict with a 'results' key containing a list of search results
-            - A list of dicts, each containing search results
-        max_tokens_per_source (int): Maximum number of tokens to include for each source's content
-        fetch_full_page (bool, optional): Whether to include the full page content. Defaults to False.
-
-    Returns:
-        str: Formatted string with deduplicated sources
-
-    Raises:
-        ValueError: If input is neither a dict with 'results' key nor a list of search results
-    """
-    # Convert input to list of results
     if isinstance(search_response, dict):
-        sources_list = search_response["results"]
-    elif isinstance(search_response, list):
-        sources_list = []
-        for response in search_response:
-            if isinstance(response, dict) and "results" in response:
-                sources_list.extend(response["results"])
-            else:
-                sources_list.extend(response)
+        sources_list = search_response.get("results", [])
     else:
-        raise ValueError(
-            "Input must be either a dict with 'results' or a list of search results"
-        )
+        sources_list = search_response
 
-    # Deduplicate by URL
-    unique_sources = {}
+    unique_sources: dict[str, Dict[str, Any]] = {}
     for source in sources_list:
-        if source["url"] not in unique_sources:
-            unique_sources[source["url"]] = source
+        url = source.get("url")
+        if not url:
+            continue
+        if url not in unique_sources:
+            unique_sources[url] = source
+
+    formatted_parts: List[str] = []
+    for source in unique_sources.values():
+        title = source.get("title") or source.get("url", "")
+        content = source.get("content", "")
+        formatted_parts.append(f"信息来源: {title}\n\n")
+        formatted_parts.append(f"URL: {source.get('url', '')}\n\n")
+        formatted_parts.append(f"信息内容: {content}\n\n")
 
-    # Format output text
-    formatted_text = ""
-    for i, source in enumerate(unique_sources.values(), 1):
-        formatted_text += f"信息来源: {source['title']}\n\n"
-        formatted_text += f"URL: {source['url']}\n\n"
-        formatted_text += (
-            f"信息内容: {source['content']}\n\n"
-        )
         if fetch_full_page:
-            # Using rough estimate of characters per token
-            char_limit = max_tokens_per_source * CHARS_PER_TOKEN
-            # Handle None raw_content
-            raw_content = source.get("raw_content", "")
+            raw_content = source.get("raw_content")
             if raw_content is None:
+                logger.debug("raw_content missing for %s", source.get("url", ""))
                 raw_content = ""
-                print(f"Warning: No raw_content found for source {source['url']}")
+            char_limit = max_tokens_per_source * CHARS_PER_TOKEN
             if len(raw_content) > char_limit:
-                raw_content = raw_content[:char_limit] + "... [truncated]"
-            formatted_text += f"详细信息内容限制为 {max_tokens_per_source} 个 token: {raw_content}\n\n"
-
-    return formatted_text.strip()
-
-
-def format_sources(search_results: Dict[str, Any]) -> str:
-    """Format search results into a bullet-point list of sources with URLs.
-
-    Creates a simple bulleted list of search results with title and URL for each source.
-
-    Args:
-        search_results (Dict[str, Any]): Search response containing a 'results' key with
-                                        a list of search result objects
-
-    Returns:
-        str: Formatted string with sources as bullet points in the format "* title : url"
-    """
-    return "\n".join(
-        f"* {source['title']} : {source['url']}" for source in search_results["results"]
-    )
-
-
-def fetch_raw_content(url: str) -> Optional[str]:
-    """
-    Fetch HTML content from a URL and convert it to markdown format.
-
-    Uses a 10-second timeout to avoid hanging on slow sites or large pages.
-
-    Args:
-        url (str): The URL to fetch content from
-
-    Returns:
-        Optional[str]: The fetched content converted to markdown if successful,
-                      None if any error occurs during fetching or conversion
-    """
-    try:
-        # Create a client with reasonable timeout
-        with httpx.Client(timeout=10.0) as client:
-            response = client.get(url)
-            response.raise_for_status()
-            return markdownify(response.text)
-    except Exception as e:
-        print(f"Warning: Failed to fetch full page content for {url}: {str(e)}")
-        return None
-
-
-@traceable
-def duckduckgo_search(
-    query: str, max_results: int = 3, fetch_full_page: bool = False
-) -> Dict[str, List[Dict[str, Any]]]:
-    """Search the web using DuckDuckGo and return formatted results.
-
-    Uses the DDGS library to perform web searches through DuckDuckGo.
-
-    Args:
-        query (str): The search query to execute
-        max_results (int, optional): Maximum number of results to return. Defaults to 3.
-        fetch_full_page (bool, optional): Whether to fetch full page content from result URLs.
-                                         Defaults to False.
-
-    Returns:
-        Dict[str, List[Dict[str, Any]]]: Search response containing:
-            - results (list): List of search result dictionaries, each containing:
-                - title (str): Title of the search result
-                - url (str): URL of the search result
-                - content (str): Snippet/summary of the content
-                - raw_content (str or None): Full page content if fetch_full_page is True,
-                                            otherwise same as content
-    """
-    try:
-        with DDGS(timeout=10) as client:
-            search_results = client.text(
-                query,
-                max_results=max_results,
-                backend="duckduckgo",
+                raw_content = f"{raw_content[:char_limit]}... [truncated]"
+            formatted_parts.append(
+                f"详细信息内容限制为 {max_tokens_per_source} 个 token: {raw_content}\n\n"
             )
 
-        results: list[dict[str, Any]] = []
-        for entry in search_results:
-            url = entry.get("href") or entry.get("url")
-            title = entry.get("title") or url
-            content = entry.get("body") or entry.get("content")
-
-            if not all([url, title, content]):
-                print(f"Warning: Incomplete result from DuckDuckGo: {entry}")
-                continue
-
-            raw_content = content
-            if fetch_full_page:
-                fetched = fetch_raw_content(url)
-                raw_content = fetched if fetched is not None else content
-
-            results.append(
-                {
-                    "title": title,
-                    "url": url,
-                    "content": content,
-                    "raw_content": raw_content,
-                }
-            )
-
-        return {"results": results}
-    except DDGSException as exc:
-        print(f"Error in DuckDuckGo search: {str(exc)}")
-        print("Full error details: DDGSException")
-        return {"results": []}
-    except Exception as exc:  # pragma: no cover - defensive
-        print(f"Unexpected error in DuckDuckGo search: {str(exc)}")
-        print(f"Full error details: {type(exc).__name__}")
-        return {"results": []}
-
-
-@traceable
-def searxng_search(
-    query: str, max_results: int = 3, fetch_full_page: bool = False
-) -> Dict[str, List[Dict[str, Any]]]:
-    """
-    Search the web using SearXNG and return formatted results.
-
-    Uses the SearXNG JSON API (`/search?format=json`) to执行检索。
-    The SearXNG host URL is read from the SEARXNG_URL environment variable
-    or defaults to http://localhost:8888.
-
-    Args:
-        query (str): The search query to execute
-        max_results (int, optional): Maximum number of results to return. Defaults to 3.
-        fetch_full_page (bool, optional): Whether to fetch full page content from result URLs.
-                                         Defaults to False.
+    return "".join(formatted_parts).strip()
 
-    Returns:
-        Dict[str, List[Dict[str, Any]]]: Search response containing:
-            - results (list): List of search result dictionaries, each containing:
-                - title (str): Title of the search result
-                - url (str): URL of the search result
-                - content (str): Snippet/summary of the content
-                - raw_content (str or None): Full page content if fetch_full_page is True,
-                                           otherwise same as content
-    """
-    host = os.environ.get("SEARXNG_URL", "http://localhost:8888")
-    endpoint = f"{host.rstrip('/')}/search"
 
-    try:
-        response = requests.get(
-            endpoint,
-            params={
-                "q": query,
-                "format": "json",
-                "language": "zh-CN",
-                "safesearch": 1,
-                "categories": "general",
-            },
-            timeout=10,
-        )
-        response.raise_for_status()
-        payload = response.json()
-    except Exception as exc:  # pragma: no cover - 远程接口失败兜底
-        logger.warning("SearXNG request failed: %s", exc)
-        return {"results": []}
+def format_sources(search_results: Dict[str, Any] | None) -> str:
+    """Return bullet list summarising search sources."""
 
-    results = []
-    for entry in payload.get("results", [])[:max_results]:
-        url = entry.get("url") or entry.get("link")
-        title = entry.get("title") or url
-        content = entry.get("content") or entry.get("snippet") or ""
+    if not search_results:
+        return ""
 
-        if not all([url, title]) or not content:
-            logger.debug("Skipping incomplete SearXNG result: %s", entry)
-            continue
-
-        raw_content = content
-        if fetch_full_page:
-            fetched = fetch_raw_content(url)
-            raw_content = fetched if fetched is not None else content
-
-        results.append(
-            {
-                "title": title,
-                "url": url,
-                "content": content,
-                "raw_content": raw_content,
-            }
-        )
-
-    return {"results": results}
-
-
-@traceable
-def tavily_search(
-    query: str, fetch_full_page: bool = True, max_results: int = 3
-) -> Dict[str, List[Dict[str, Any]]]:
-    """
-    Search the web using the Tavily API and return formatted results.
-
-    Uses the TavilyClient to perform searches. Tavily API key must be configured
-    in the environment.
-
-    Args:
-        query (str): The search query to execute
-        fetch_full_page (bool, optional): Whether to include raw content from sources.
-                                         Defaults to True.
-        max_results (int, optional): Maximum number of results to return. Defaults to 3.
-
-    Returns:
-        Dict[str, List[Dict[str, Any]]]: Search response containing:
-            - results (list): List of search result dictionaries, each containing:
-                - title (str): Title of the search result
-                - url (str): URL of the search result
-                - content (str): Snippet/summary of the content
-                - raw_content (str or None): Full content of the page if available and
-                                            fetch_full_page is True
-    """
-
-    tavily_client = TavilyClient()
-    return tavily_client.search(
-        query, max_results=max_results, include_raw_content=fetch_full_page
-    )
-
-
-@traceable
-def advanced_search(query: str, fetch_full_page: bool = False) -> Dict[str, Any]:
-    """利用多源策略执行搜索,优先 Tavily,其次 SerpApi,最后 DuckDuckGo。"""
-
-    notices: list[str] = []
-    results: list[dict[str, Any]] = []
-    answer: Optional[str] = None
-    backend = "advanced"
-
-    # 优先尝试 Tavily
-    tavily_key = os.getenv("TAVILY_API_KEY")
-    if tavily_key:
-        try:
-            tavily_result = tavily_search(
-                query,
-                fetch_full_page=fetch_full_page,
-                max_results=5,
-            )
-            if tavily_result.get("results"):
-                backend = "tavily"
-                answer = tavily_result.get("answer")
-                results.extend(tavily_result["results"])
-                logger.info("advanced_search: using Tavily results for query='%s'", query)
-                return {
-                    "results": results,
-                    "notices": notices,
-                    "answer": answer,
-                    "backend": backend,
-                }
-            notices.append("⚠️ Tavily 未返回有效结果,尝试其他搜索源")
-            logger.info("advanced_search: Tavily returned no results for query='%s'", query)
-        except Exception as exc:  # pragma: no cover - 第三方库防御
-            notices.append(f"⚠️ Tavily 搜索失败:{exc}")
-            logger.warning("advanced_search: Tavily failed for query='%s': %s", query, exc)
-    else:
-        notices.append("⚠️ 未检测到 TAVILY_API_KEY,跳过 Tavily 搜索")
-        logger.info("advanced_search: Tavily disabled for query='%s'", query)
-
-    # 其次尝试 SerpApi
-    serpapi_key = os.getenv("SERPAPI_API_KEY")
-    if serpapi_key:
-        try:
-            from serpapi import GoogleSearch  # type: ignore
-
-            params = {
-                "engine": "google",
-                "q": query,
-                "api_key": serpapi_key,
-                "gl": "cn",
-                "hl": "zh-cn",
-                "num": 5,
-            }
-
-            client = GoogleSearch(params)
-            response = client.get_dict()
-
-            answer_box = response.get("answer_box") or {}
-            direct_answer = answer_box.get("answer") or answer_box.get("snippet")
-            if direct_answer:
-                answer = direct_answer
-
-            organic_results = response.get("organic_results", [])
-            for item in organic_results[:5]:
-                results.append(
-                    {
-                        "title": item.get("title") or item.get("link") or query,
-                        "url": item.get("link", ""),
-                        "content": item.get("snippet") or item.get("title") or "",
-                        "raw_content": item.get("snippet") or "",
-                    }
-                )
-
-            if results:
-                backend = "serpapi"
-                logger.info("advanced_search: using SerpApi results for query='%s'", query)
-                return {
-                    "results": results,
-                    "notices": notices,
-                    "answer": answer,
-                    "backend": backend,
-                }
-
-            notices.append("⚠️ SerpApi 未返回有效结果,回退到通用搜索")
-            logger.info("advanced_search: SerpApi returned no results for query='%s'", query)
-        except ImportError:
-            notices.append("⚠️ SerpApi 库未安装,跳过 SerpApi 搜索 (pip install google-search-results)")
-            logger.warning("advanced_search: serpapi package missing, skip query='%s'", query)
-        except Exception as exc:  # pragma: no cover - 第三方库防御
-            notices.append(f"⚠️ SerpApi 搜索失败:{exc}")
-            logger.warning("advanced_search: SerpApi failed for query='%s': %s", query, exc)
-    else:
-        notices.append("⚠️ 未检测到 SERPAPI_API_KEY,跳过 SerpApi 搜索")
-        logger.info("advanced_search: SerpApi disabled for query='%s'", query)
-
-    # 最后回退到 DuckDuckGo(无需额外配置)
-    try:
-        ddg_result = duckduckgo_search(
-            query,
-            max_results=5,
-            fetch_full_page=fetch_full_page,
-        )
-        if ddg_result.get("results"):
-            backend = "duckduckgo"
-            results.extend(ddg_result["results"])
-            logger.info("advanced_search: using DuckDuckGo results for query='%s'", query)
-        else:
-            notices.append("⚠️ DuckDuckGo 未返回有效结果")
-            logger.info("advanced_search: DuckDuckGo returned no results for query='%s'", query)
-    except Exception as exc:  # pragma: no cover - 第三方库防御
-        notices.append(f"⚠️ DuckDuckGo 搜索失败:{exc}")
-        logger.warning("advanced_search: DuckDuckGo failed for query='%s': %s", query, exc)
-
-    return {
-        "results": results,
-        "notices": notices,
-        "answer": answer,
-        "backend": backend,
-    }
-
-
-@traceable
-def perplexity_search(
-    query: str, perplexity_search_loop_count: int = 0
-) -> Dict[str, Any]:
-    """
-    Search the web using the Perplexity API and return formatted results.
-
-    Uses the Perplexity API to perform searches with the 'sonar-pro' model.
-    Requires a PERPLEXITY_API_KEY environment variable to be set.
-
-    Args:
-        query (str): The search query to execute
-        perplexity_search_loop_count (int, optional): The loop step for perplexity search
-                                                     (used for source labeling). Defaults to 0.
-
-    Returns:
-        Dict[str, Any]: Search response containing:
-            - results (list): List of search result dictionaries, each containing:
-                - title (str): Title of the search result (includes search counter)
-                - url (str): URL of the citation source
-                - content (str): Content of the response or reference to main content
-                - raw_content (str or None): Full content for the first source, None for additional
-                                            citation sources
-
-    Raises:
-        requests.exceptions.HTTPError: If the API request fails
-    """
-
-    headers = {
-        "accept": "application/json",
-        "content-type": "application/json",
-        "Authorization": f"Bearer {os.getenv('PERPLEXITY_API_KEY')}",
-    }
-
-    payload = {
-        "model": "sonar-pro",
-        "messages": [
-            {
-                "role": "system",
-                "content": "Search the web and provide factual information with sources.",
-            },
-            {"role": "user", "content": query},
-        ],
-    }
-
-    response = requests.post(
-        "https://api.perplexity.ai/chat/completions", headers=headers, json=payload
+    results = search_results.get("results", [])
+    return "\n".join(
+        f"* {item.get('title', item.get('url', ''))} : {item.get('url', '')}"
+        for item in results
+        if item.get("url")
     )
-    response.raise_for_status()  # Raise exception for bad status codes
-
-    # Parse the response
-    data = response.json()
-    content = data["choices"][0]["message"]["content"]
-
-    # Perplexity returns a list of citations for a single search result
-    citations = data.get("citations", ["https://perplexity.ai"])
-
-    # Return first citation with full content, others just as references
-    results = [
-        {
-            "title": f"Perplexity Search {perplexity_search_loop_count + 1}, Source 1",
-            "url": citations[0],
-            "content": content,
-            "raw_content": content,
-        }
-    ]
-
-    # Add additional citations without duplicating content
-    for i, citation in enumerate(citations[1:], start=2):
-        results.append(
-            {
-                "title": f"Perplexity Search {perplexity_search_loop_count + 1}, Source {i}",
-                "url": citation,
-                "content": "See above for full content",
-                "raw_content": None,
-            }
-        )
-
-    return {"results": results}

+ 13 - 13
docs/chapter14/第十四章 自动化深度研究智能体.md

@@ -32,10 +32,10 @@
 
 ### 14.1.2 整体能力与用户价值
 
-`helloagents-deepresearch` 项目将后端HelloAgents智能体、可配置搜索适配器与前端可视化界面结合,形成「输入主题→实时观察→获取总结」的闭环体验。整体亮点如下:
+`helloagents-deepresearch` 项目将后端HelloAgents智能体、HelloAgents 内置工具体系与前端可视化界面结合,形成「输入主题→实时观察→获取总结」的闭环体验。整体亮点如下:
 
-- <strong>多提供者模型接入</strong>:支持 Ollama、LMStudio 或自定义 OpenAI 兼容服务,自主选择推理能力与成本。
-- <strong>多搜索源融合</strong>:内置 DuckDuckGo、Tavily、Perplexity、SearXNG 适配器,灵活切换,甚至可以按轮次混合使用
+- <strong>多提供者模型接入</strong>:支持 Ollama、LMStudio,或通过 `LLM_PROVIDER=custom` 自定义任意 OpenAI 兼容服务,自主选择推理能力与成本。
+- <strong>多搜索源融合</strong>:直接复用 HelloAgents 的 `SearchTool`,内置 Tavily、SerpApi、DuckDuckGo、SearXNG、Perplexity 以及高级混合策略,可按需切换
 - <strong>流式反馈</strong>:后端通过 Server-Sent Events 推送各阶段结果,前端即时展示时间线、最新来源和逐字更新的总结。
 - <strong>配置优先级清晰</strong>:环境变量、代码默认值分层管理,方便调试与部署。
 系统采用经典的<strong>前后端分离架构</strong>,分为四个层次,如图14.1所示
@@ -52,7 +52,7 @@ graph LR
     Config[Configuration.from_env]
     subgraph Workflow[DeepResearchAgent 工作流]
       Planner{{PlanningService\n任务规划}}
-      Search{{dispatch_search\nprepare_research_context}}
+      Search{{dispatch_search\n(SearchTool)}}
       Summarizer{{SummarizationService\n任务总结}}
       Reporter{{ReportingService\n报告整合}}
       Tracker[[ToolCallTracker\n工具事件]]
@@ -63,7 +63,7 @@ graph LR
     NoteTool[(NoteTool\nToolRegistry)]
   end
   subgraph External[外部依赖]
-    SearchAPI[(Tavily / Perplexity /\nDuckDuckGo / SearxNG / Advanced)]
+    SearchAPI[(HelloAgents SearchTool\nTavily · SerpApi · DuckDuckGo ·\nSearXNG · Perplexity · Advanced)]
     Notes[(本地笔记\nnotes_workspace)]
   end
 
@@ -210,7 +210,7 @@ graph TD
 - `agents/deep_research_agent.py`:顶层协调者,负责任务规划、并行执行与报告沉淀。
 - `services/planner_service.py`、`summarization_service.py`、`reporting_service.py`:分别封装计划、总结、报告逻辑,内部都复用了 `ToolAwareSimpleAgent`。
 - `services/tool_events.py`:跟踪 `note` 工具调用,把事件转换成 SSE,可见第九章 `NoteTool` 的集成成果。
-- `services/search_service.py`:统一封装多源搜索结果,与第七章的自定义搜索工具同样遵循“结果字典”约定
+- `services/search_service.py`:作为 HelloAgents `SearchTool` 的薄包装,将结构化搜索结果与上下文整理给下游 Agent
 - `configuration.py`、`api.py`:负责配置加载、HelloAgentsLLM 初始化与 HTTP 层的流式推送。
 
 数据流转顺序为:
@@ -275,7 +275,7 @@ class Configuration(BaseModel):
         return cls(**raw_values)
 ```
 
-配置解析逻辑(`backend/src/deep_researcher/configuration.py:18`)先读取所有大写环境变量,再应用显式别名,最后才合并 API 请求的覆盖值。实际运行时意味着:
+配置解析逻辑(`backend/src/deep_researcher/configuration.py:18`)先读取所有大写环境变量,再应用显式别名,最后才合并 API 请求的覆盖值。需要注意的是,从本章起 HelloAgents 原生 `HelloAgentsLLM` 已支持 `LLM_PROVIDER=custom`:只要同时提供 `LLM_BASE_URL` 与 `LLM_API_KEY` 即可连接任意 OpenAI 兼容服务,而 `LOCAL_LLM`/`OLLAMA_BASE_URL` 等字段继续服务于本地模型场景。实际运行时意味着:
 
 - `.env` 或系统环境变量拥有最高优先级,便于本地调试和生产部署。
 - LangGraph/UI 提交的临时参数通过 `overrides` 注入,不会修改全局环境。
@@ -283,14 +283,14 @@ class Configuration(BaseModel):
 
 ### 14.2.3 自定义多源搜索工具
 
-为了适应不同团队的检索与合规要求,`helloagents-deepresearch` 的搜索层设计成可插拔结构。除了内置的 DuckDuckGo、Tavily、Perplexity、SearXNG,我们还可以像第七章那样编写自己的多源工具,再通过配置切换到 `advanced` 模式使用。建议按如下步骤操作
+为了适应不同团队的检索与合规要求,本项目直接复用 HelloAgents 框架的 `SearchTool`。这一工具现已支持 Tavily、SerpApi、DuckDuckGo、SearXNG、Perplexity 以及「advanced」混合策略,并且同一份结构化返回可以被多个 Agent 共享。因此在深度研究项目里,我们只需要按需配置即可
 
-1. **复用示例代码**:参考 `code/chapter7/my_advanced_search.py` 中的 `MyAdvancedSearchTool` 类和 `create_advanced_search_registry()` 工厂函数。该示例演示了如何根据环境变量自动检测 Tavily、SerpApi 等后端,并在执行时做降级提示
-2. **本地验证**:运行 `code/chapter7/test_advanced_search.py` 可以快速检查自定义工具是否按预期返回结果;若未配置任何 API key,它会输出友好的诊断信息,便于调试
-3. **接入深度研究项目**:在 `backend/src/deep_researcher/utils.py` 中新增一个适配函数(示例实现见函数 `advanced_search`),内部直接调用你自定义的工具,再将返回的文本整理为统一的 `{"results": [...], "notices": [...]}` 结构,方便后续的去重与摘要逻辑复用。
-4. **启用新工具**:将后端配置中的 `SEARCH_API` 设置为 `advanced`,或在前端表单里选择该选项;深度研究 Agent 会自动透传工具的降级提示与直接答案,并在前端时间线中显示,确保用户知晓检索来源和失败原因
+1. **选择后端**:将 `SEARCH_API` 设为 `tavily`、`serpapi`、`duckduckgo`、`searxng`、`perplexity` 或 `advanced`。其中 `hybrid` 与 `advanced` 等价,都会优先尝试 Tavily/SerpApi,再降级到 DuckDuckGo
+2. **配置密钥**:根据选定后端设置 `TAVILY_API_KEY`、`SERPAPI_API_KEY`、`PERPLEXITY_API_KEY` 等环境变量;若只想使用无密钥的 DuckDuckGo/SearXNG,可不设置
+3. **结构化输出**:`SearchTool` 默认返回友好的文本描述,当我们在 `services/search_service.py` 中传入 `mode=structured` 时,会得到统一的 `{"results": [...], "answer": ..., "notices": [...]}` 结构,方便后续做去重、裁剪、引用。
+4. **深度定制(可选)**:如果还需要扩展新的搜索后端,可以在 HelloAgents 仓库内直接继承 `SearchTool` 并覆盖 `_search_xxx` 方法,或者提交 PR 将新后端合入框架。由于深度研究项目只是框架的“薄封装”,一旦上游合并,所有下游应用都会自动获得能力升级
 
-通过这种方式,你可以把任意内部/垂直领域的检索能力接入到深度研究工作流里,同时保持与核心流程的高度解耦
+这种设计让学习者无需在项目中维护额外的搜索适配器,即可通过配置或框架升级获得最新能力;同时也保留了在企业环境内扩展私有搜索源的灵活度
 
 > 提示:配置项 `ENABLE_NOTES` 默认为 `true`。当启用时,后端会为每个任务自动同步一份 Markdown 笔记(目录由 `NOTES_WORKSPACE` 指定),并把 `note` 工具挂载到所有 Agent,方便它们在需要时对笔记做增删改查。流式事件会附带 `note_id`,前端可据此展示或跳转对应笔记。