utils.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. """Utility helpers shared across deep researcher services."""
  2. from __future__ import annotations
  3. import logging
  4. from typing import Any, Dict, List, Union
  5. CHARS_PER_TOKEN = 4
  6. logger = logging.getLogger(__name__)
  7. def get_config_value(value: Any) -> str:
  8. """Return configuration value as plain string."""
  9. return value if isinstance(value, str) else value.value
  10. def strip_thinking_tokens(text: str) -> str:
  11. """Remove ``<think>`` sections from model responses."""
  12. while "<think>" in text and "</think>" in text:
  13. start = text.find("<think>")
  14. end = text.find("</think>") + len("</think>")
  15. text = text[:start] + text[end:]
  16. return text
  17. def deduplicate_and_format_sources(
  18. search_response: Dict[str, Any] | List[Dict[str, Any]],
  19. max_tokens_per_source: int,
  20. *,
  21. fetch_full_page: bool = False,
  22. ) -> str:
  23. """Format and deduplicate search results for downstream prompting."""
  24. if isinstance(search_response, dict):
  25. sources_list = search_response.get("results", [])
  26. else:
  27. sources_list = search_response
  28. unique_sources: dict[str, Dict[str, Any]] = {}
  29. for source in sources_list:
  30. url = source.get("url")
  31. if not url:
  32. continue
  33. if url not in unique_sources:
  34. unique_sources[url] = source
  35. formatted_parts: List[str] = []
  36. for source in unique_sources.values():
  37. title = source.get("title") or source.get("url", "")
  38. content = source.get("content", "")
  39. formatted_parts.append(f"信息来源: {title}\n\n")
  40. formatted_parts.append(f"URL: {source.get('url', '')}\n\n")
  41. formatted_parts.append(f"信息内容: {content}\n\n")
  42. if fetch_full_page:
  43. raw_content = source.get("raw_content")
  44. if raw_content is None:
  45. logger.debug("raw_content missing for %s", source.get("url", ""))
  46. raw_content = ""
  47. char_limit = max_tokens_per_source * CHARS_PER_TOKEN
  48. if len(raw_content) > char_limit:
  49. raw_content = f"{raw_content[:char_limit]}... [truncated]"
  50. formatted_parts.append(
  51. f"详细信息内容限制为 {max_tokens_per_source} 个 token: {raw_content}\n\n"
  52. )
  53. return "".join(formatted_parts).strip()
  54. def format_sources(search_results: Dict[str, Any] | None) -> str:
  55. """Return bullet list summarising search sources."""
  56. if not search_results:
  57. return ""
  58. results = search_results.get("results", [])
  59. return "\n".join(
  60. f"* {item.get('title', item.get('url', ''))} : {item.get('url', '')}"
  61. for item in results
  62. if item.get("url")
  63. )