async_executor.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. """异步工具执行器 - HelloAgents异步工具执行支持"""
  2. import asyncio
  3. import concurrent.futures
  4. from typing import Dict, Any, List
  5. from .registry import ToolRegistry
  6. class AsyncToolExecutor:
  7. """异步工具执行器"""
  8. def __init__(self, registry: ToolRegistry, max_workers: int = 4):
  9. self.registry = registry
  10. self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
  11. async def execute_tool_async(self, tool_name: str, input_data: str) -> str:
  12. """异步执行单个工具"""
  13. loop = asyncio.get_event_loop()
  14. def _execute():
  15. return self.registry.execute_tool(tool_name, input_data)
  16. try:
  17. result = await loop.run_in_executor(self.executor, _execute)
  18. return result
  19. except Exception as e:
  20. return f"❌ 工具 '{tool_name}' 异步执行失败: {e}"
  21. async def execute_tools_parallel(self, tasks: List[Dict[str, str]]) -> List[Dict[str, Any]]:
  22. """
  23. 并行执行多个工具
  24. Args:
  25. tasks: 任务列表,每个任务包含 tool_name 和 input_data
  26. Returns:
  27. 执行结果列表,包含任务信息和结果
  28. """
  29. print(f"🚀 开始并行执行 {len(tasks)} 个工具任务")
  30. # 创建异步任务
  31. async_tasks = []
  32. for i, task in enumerate(tasks):
  33. tool_name = task.get("tool_name")
  34. input_data = task.get("input_data", "")
  35. if not tool_name:
  36. continue
  37. print(f"📝 创建任务 {i+1}: {tool_name}")
  38. async_task = self.execute_tool_async(tool_name, input_data)
  39. async_tasks.append((i, task, async_task))
  40. # 等待所有任务完成
  41. results = []
  42. for i, task, async_task in async_tasks:
  43. try:
  44. result = await async_task
  45. results.append({
  46. "task_id": i,
  47. "tool_name": task["tool_name"],
  48. "input_data": task["input_data"],
  49. "result": result,
  50. "status": "success"
  51. })
  52. print(f"✅ 任务 {i+1} 完成: {task['tool_name']}")
  53. except Exception as e:
  54. results.append({
  55. "task_id": i,
  56. "tool_name": task["tool_name"],
  57. "input_data": task["input_data"],
  58. "result": str(e),
  59. "status": "error"
  60. })
  61. print(f"❌ 任务 {i+1} 失败: {task['tool_name']} - {e}")
  62. print(f"🎉 并行执行完成,成功: {sum(1 for r in results if r['status'] == 'success')}/{len(results)}")
  63. return results
  64. async def execute_tools_batch(self, tool_name: str, input_list: List[str]) -> List[Dict[str, Any]]:
  65. """
  66. 批量执行同一个工具
  67. Args:
  68. tool_name: 工具名称
  69. input_list: 输入数据列表
  70. Returns:
  71. 执行结果列表
  72. """
  73. tasks = [
  74. {"tool_name": tool_name, "input_data": input_data}
  75. for input_data in input_list
  76. ]
  77. return await self.execute_tools_parallel(tasks)
  78. def close(self):
  79. """关闭执行器"""
  80. self.executor.shutdown(wait=True)
  81. print("🔒 异步工具执行器已关闭")
  82. def __enter__(self):
  83. return self
  84. def __exit__(self, exc_type, exc_val, exc_tb):
  85. self.close()
  86. # 便捷函数
  87. async def run_parallel_tools(registry: ToolRegistry, tasks: List[Dict[str, str]], max_workers: int = 4) -> List[Dict[str, Any]]:
  88. """
  89. 便捷函数:并行执行多个工具
  90. Args:
  91. registry: 工具注册表
  92. tasks: 任务列表
  93. max_workers: 最大工作线程数
  94. Returns:
  95. 执行结果列表
  96. """
  97. async with AsyncToolExecutor(registry, max_workers) as executor:
  98. return await executor.execute_tools_parallel(tasks)
  99. async def run_batch_tool(registry: ToolRegistry, tool_name: str, input_list: List[str], max_workers: int = 4) -> List[Dict[str, Any]]:
  100. """
  101. 便捷函数:批量执行同一个工具
  102. Args:
  103. registry: 工具注册表
  104. tool_name: 工具名称
  105. input_list: 输入数据列表
  106. max_workers: 最大工作线程数
  107. Returns:
  108. 执行结果列表
  109. """
  110. async with AsyncToolExecutor(registry, max_workers) as executor:
  111. return await executor.execute_tools_batch(tool_name, input_list)
  112. # 同步包装函数(为了兼容性)
  113. def run_parallel_tools_sync(registry: ToolRegistry, tasks: List[Dict[str, str]], max_workers: int = 4) -> List[Dict[str, Any]]:
  114. """同步版本的并行工具执行"""
  115. return asyncio.run(run_parallel_tools(registry, tasks, max_workers))
  116. def run_batch_tool_sync(registry: ToolRegistry, tool_name: str, input_list: List[str], max_workers: int = 4) -> List[Dict[str, Any]]:
  117. """同步版本的批量工具执行"""
  118. return asyncio.run(run_batch_tool(registry, tool_name, input_list, max_workers))
  119. # 示例函数
  120. async def demo_parallel_execution():
  121. """演示并行执行的示例"""
  122. from .registry import ToolRegistry
  123. # 创建注册表(这里假设已经注册了工具)
  124. registry = ToolRegistry()
  125. # 定义并行任务
  126. tasks = [
  127. {"tool_name": "my_calculator", "input_data": "2 + 2"},
  128. {"tool_name": "my_calculator", "input_data": "3 * 4"},
  129. {"tool_name": "my_calculator", "input_data": "sqrt(16)"},
  130. {"tool_name": "my_calculator", "input_data": "10 / 2"},
  131. ]
  132. # 并行执行
  133. results = await run_parallel_tools(registry, tasks)
  134. # 显示结果
  135. print("\n📊 并行执行结果:")
  136. for result in results:
  137. status_icon = "✅" if result["status"] == "success" else "❌"
  138. print(f"{status_icon} {result['tool_name']}({result['input_data']}) = {result['result']}")
  139. return results
  140. if __name__ == "__main__":
  141. # 运行演示
  142. asyncio.run(demo_parallel_execution())