1
0

12_ANPTaskDistribution.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from hello_agents.protocols import ANPDiscovery, register_service
  2. from hello_agents import SimpleAgent, HelloAgentsLLM
  3. from hello_agents.tools.builtin import ANPTool
  4. import random
  5. from dotenv import load_dotenv
  6. load_dotenv()
  7. llm = HelloAgentsLLM()
  8. # 1. 创建服务发现中心
  9. discovery = ANPDiscovery()
  10. # 2. 注册多个计算节点
  11. for i in range(10):
  12. register_service(
  13. discovery=discovery,
  14. service_id=f"compute_node_{i}",
  15. service_name=f"计算节点{i}",
  16. service_type="compute",
  17. capabilities=["data_processing", "ml_training"],
  18. endpoint=f"http://node{i}:8000",
  19. metadata={
  20. "load": random.uniform(0.1, 0.9),
  21. "cpu_cores": random.choice([4, 8, 16]),
  22. "memory_gb": random.choice([16, 32, 64]),
  23. "gpu": random.choice([True, False])
  24. }
  25. )
  26. print(f"✅ 注册了 {len(discovery.list_all_services())} 个计算节点")
  27. # 3. 创建任务调度Agent
  28. scheduler = SimpleAgent(
  29. name="任务调度器",
  30. llm=llm,
  31. system_prompt="""你是一个智能任务调度器,负责:
  32. 1. 分析任务需求
  33. 2. 选择最合适的计算节点
  34. 3. 分配任务
  35. 选择节点时考虑:负载、CPU核心数、内存、GPU等因素。
  36. 使用 service_discovery 工具时,必须提供 action 参数:
  37. - 查看所有节点:{"action": "discover_services", "service_type": "compute"}
  38. - 获取网络统计:{"action": "get_stats"}"""
  39. )
  40. # 添加ANP工具
  41. anp_tool = ANPTool(
  42. name="service_discovery",
  43. description="服务发现工具,可以查找和选择计算节点",
  44. discovery=discovery
  45. )
  46. scheduler.add_tool(anp_tool)
  47. # 4. 智能任务分配
  48. def assign_task(task_description):
  49. print(f"\n任务:{task_description}")
  50. print("=" * 50)
  51. # 让Agent智能选择节点
  52. response = scheduler.run(f"""
  53. 请为以下任务选择最合适的计算节点:
  54. {task_description}
  55. 步骤:
  56. 1. 使用 service_discovery 工具查看所有可用的计算节点(service_type="compute")
  57. 2. 分析每个节点的特点(负载、CPU核心数、内存、GPU等)
  58. 3. 根据任务需求选择最合适的节点
  59. 4. 说明选择理由
  60. 请直接给出最终选择的节点ID和理由。
  61. """)
  62. print(response)
  63. print("=" * 50)
  64. # 测试不同类型的任务
  65. assign_task("训练一个大型深度学习模型,需要GPU支持")
  66. assign_task("处理大量文本数据,需要高内存")
  67. assign_task("运行轻量级数据分析任务")