09_A2A_Server.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. """
  2. 10.3.3 使用 HelloAgents A2A 工具
  3. (1)创建A2A Agent服务端
  4. """
  5. from hello_agents.protocols import A2AServer
  6. import threading
  7. import time
  8. # 创建研究员Agent服务
  9. researcher = A2AServer(
  10. name="researcher",
  11. description="负责搜索和分析资料的Agent",
  12. version="1.0.0"
  13. )
  14. # 定义技能
  15. @researcher.skill("research")
  16. def handle_research(text: str) -> str:
  17. """处理研究请求"""
  18. import re
  19. match = re.search(r'research\s+(.+)', text, re.IGNORECASE)
  20. topic = match.group(1).strip() if match else text
  21. # 实际的研究逻辑(这里简化)
  22. result = {
  23. "topic": topic,
  24. "findings": f"关于{topic}的研究结果...",
  25. "sources": ["来源1", "来源2", "来源3"]
  26. }
  27. return str(result)
  28. # 在后台启动服务
  29. def start_server():
  30. researcher.run(host="localhost", port=5000)
  31. if __name__ == "__main__":
  32. server_thread = threading.Thread(target=start_server, daemon=True)
  33. server_thread.start()
  34. print("✅ 研究员Agent服务已启动在 http://localhost:5000")
  35. # 保持程序运行
  36. try:
  37. while True:
  38. time.sleep(1)
  39. except KeyboardInterrupt:
  40. print("\n服务已停止")