| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- """
- 10.3.4 在智能体中使用A2A工具
- (3)高级用法:Agent间协商
- """
- from hello_agents.protocols import A2AServer, A2AClient
- import threading
- import time
- # 创建两个需要协商的Agent
- agent1 = A2AServer(
- name="agent1",
- description="Agent 1"
- )
- @agent1.skill("propose")
- def handle_proposal(text: str) -> str:
- """处理协商提案"""
- import re
- import json
-
- # 解析提案
- match = re.search(r'propose\s+(.+)', text, re.IGNORECASE)
- proposal_str = match.group(1).strip() if match else text
-
- try:
- proposal = eval(proposal_str)
- task = proposal.get("task")
- deadline = proposal.get("deadline")
-
- # 评估提案
- if deadline >= 7: # 至少需要7天
- result = {"accepted": True, "message": "接受提案"}
- else:
- result = {
- "accepted": False,
- "message": "时间太紧",
- "counter_proposal": {"deadline": 7}
- }
- return str(result)
- except:
- return str({"accepted": False, "message": "无效的提案格式"})
- agent2 = A2AServer(
- name="agent2",
- description="Agent 2"
- )
- @agent2.skill("negotiate")
- def negotiate_task(text: str) -> str:
- """发起协商"""
- import re
-
- # 解析任务和截止日期
- match = re.search(r'negotiate\s+task:(.+?)\s+deadline:(\d+)', text, re.IGNORECASE)
- if match:
- task = match.group(1).strip()
- deadline = int(match.group(2))
-
- # 向agent1发送提案
- proposal = {"task": task, "deadline": deadline}
- return str({"status": "negotiating", "proposal": proposal})
- else:
- return str({"status": "error", "message": "无效的协商请求"})
- # 启动服务
- if __name__ == "__main__":
- threading.Thread(target=lambda: agent1.run(port=7000), daemon=True).start()
- threading.Thread(target=lambda: agent2.run(port=7001), daemon=True).start()
- time.sleep(2)
-
- # 测试协商流程
- client1 = A2AClient("http://localhost:7000")
- client2 = A2AClient("http://localhost:7001")
-
- # Agent2发起协商
- negotiation = client2.execute_skill("negotiate", "negotiate task:开发新功能 deadline:5")
- print(f"协商请求:{negotiation.get('result')}")
-
- # Agent1评估提案
- proposal = client1.execute_skill("propose", "propose {'task': '开发新功能', 'deadline': 5}")
- print(f"提案评估:{proposal.get('result')}")
-
- # 保持服务运行
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- print("\n服务已停止")
|