terminal_tool.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. """TerminalTool - 命令行工具
  2. 为Agent提供安全的命令行执行能力,支持:
  3. - 文件系统操作(ls, cat, head, tail, find, grep)
  4. - 文本处理(wc, sort, uniq)
  5. - 目录导航(pwd, cd)
  6. - 安全限制(白名单命令、路径限制、超时控制)
  7. 使用场景:
  8. - JIT(即时)文件检索与分析
  9. - 代码仓库探索
  10. - 日志文件分析
  11. - 数据文件预览
  12. 安全特性:
  13. - 命令白名单(只允许安全的只读命令)
  14. - 工作目录限制(沙箱)
  15. - 超时控制
  16. - 输出大小限制
  17. - 禁止危险操作(rm, mv, chmod等)
  18. """
  19. from typing import Dict, Any, List, Optional
  20. import subprocess
  21. import os
  22. from pathlib import Path
  23. import shlex
  24. import re
  25. from ..base import Tool, ToolParameter
  26. class TerminalTool(Tool):
  27. """命令行工具
  28. 提供安全的命令行执行能力,支持常用的文件系统和文本处理命令。
  29. 安全限制:
  30. - 只允许白名单中的命令
  31. - 限制在指定工作目录内
  32. - 超时控制(默认30秒)
  33. - 输出大小限制(默认10MB)
  34. 用法示例:
  35. ```python
  36. terminal = TerminalTool(workspace="./project")
  37. # 列出文件
  38. result = terminal.run({"command": "ls -la"})
  39. # 查看文件内容
  40. result = terminal.run({"command": "cat README.md"})
  41. # 搜索文件
  42. result = terminal.run({"command": "grep -r 'TODO' src/"})
  43. # 查看文件前10行
  44. result = terminal.run({"command": "head -n 10 data.csv"})
  45. ```
  46. """
  47. # 允许的命令白名单
  48. # 这些命令被认为是安全的,主要用于文件查看、文本处理和信息获取
  49. # 不包含可能修改系统或造成安全风险的命令(如rm、mv、chmod等)
  50. ALLOWED_COMMANDS = {
  51. # 文件列表与信息
  52. 'ls', 'dir', 'tree',
  53. # 文件内容查看
  54. 'cat', 'head', 'tail', 'less', 'more',
  55. # 文件搜索
  56. 'find', 'grep', 'egrep', 'fgrep', 'rg',
  57. # 文本处理
  58. 'wc', 'sort', 'uniq', 'cut', 'awk', 'sed',
  59. # shell 常见内建(用于管道/小脚本;仍受整体策略约束)
  60. 'echo', 'printf',
  61. # 目录/文件创建(受路径沙箱约束)
  62. 'mkdir',
  63. # 目录操作
  64. 'pwd', 'cd',
  65. # 文件信息
  66. 'file', 'stat', 'du', 'df',
  67. # 其他
  68. 'which', 'whereis',
  69. # 版本控制(只读子命令会被进一步限制)
  70. 'git',
  71. }
  72. # 常见 shell 元字符(用于检测"组合命令/写盘/子命令"等风险点;不再一刀切禁止)
  73. # 这些元字符在shell中有特殊含义,可能用于组合命令或执行危险操作
  74. # 系统会检测这些字符的存在,并根据安全策略决定是否允许执行
  75. SHELL_META_TOKENS = ["|", "||", "&&", ";", ">", ">>", "<", "$(", "`"]
  76. # 需要人类确认的高风险命令(MVP:只做识别;是否放行由上层策略决定)
  77. # 这些命令可能对系统造成不可逆的修改,需要用户明确确认才能执行
  78. DANGEROUS_BASE_COMMANDS = {"rm", "chmod"}
  79. # Git的高风险子命令,可能造成代码丢失或历史修改
  80. DANGEROUS_GIT_SUBCOMMANDS = {("reset", "--hard"), ("reset", "--hard", "HEAD")}
  81. def __init__(
  82. self,
  83. workspace: str = ".",
  84. timeout: int = 30,
  85. max_output_size: int = 10 * 1024 * 1024, # 10MB
  86. allow_cd: bool = True,
  87. confirm_dangerous: bool = False,
  88. default_shell_mode: bool = False,
  89. ):
  90. """初始化TerminalTool实例
  91. Args:
  92. workspace: 工作目录路径,所有命令将在此目录或其子目录中执行
  93. timeout: 命令执行超时时间(秒),防止长时间运行的命令
  94. max_output_size: 输出大小限制(字节),防止过大输出消耗资源
  95. allow_cd: 是否允许cd命令,控制目录切换权限
  96. confirm_dangerous: 是否在执行高风险命令时提示用户确认
  97. default_shell_mode: 默认是否启用shell模式(支持管道、重定向等)
  98. """
  99. super().__init__(
  100. name="terminal",
  101. description="命令行工具 - 执行安全的文件系统、文本处理和代码执行命令(ls, cat, grep, head, tail等)"
  102. )
  103. # 将工作目录转换为绝对路径并规范化
  104. self.workspace = Path(workspace).resolve()
  105. self.timeout = timeout
  106. self.max_output_size = max_output_size
  107. self.allow_cd = allow_cd
  108. self.confirm_dangerous = confirm_dangerous
  109. self.default_shell_mode = default_shell_mode
  110. # 当前工作目录(相对于workspace)
  111. # 初始设置为工作目录根目录,可通过cd命令更改
  112. self.current_dir = self.workspace
  113. # 确保工作目录存在,如果不存在则创建
  114. self.workspace.mkdir(parents=True, exist_ok=True)
  115. def run(self, parameters: Dict[str, Any]) -> str:
  116. """执行工具的主入口方法
  117. 根据参数解析命令并执行,包含完整的安全检查流程:
  118. 1. 参数验证 - 确保输入参数格式正确且必要参数存在
  119. 2. 命令解析和分类 - 将命令字符串解析为可执行的参数列表
  120. 3. 安全策略检查 - 多层安全验证,包括白名单检查、危险操作检测等
  121. 4. 命令执行和结果处理 - 安全执行命令并格式化返回结果
  122. 安全机制说明:
  123. - 命令白名单:只执行预定义的安全命令,防止恶意命令执行
  124. - 路径沙箱:所有文件操作限制在工作目录内,防止越权访问
  125. - 危险操作确认:高风险命令需要用户明确确认才能执行
  126. - 超时控制:防止长时间运行的命令消耗系统资源
  127. - 输出限制:防止过大输出导致内存问题
  128. Args:
  129. parameters: 包含command、allow_dangerous、shell_mode等参数的字典
  130. - command: 要执行的命令字符串
  131. - allow_dangerous: 是否允许执行危险操作
  132. - shell_mode: 是否启用shell模式(支持管道、重定向等)
  133. Returns:
  134. str: 命令执行结果或错误信息,包含详细的错误说明
  135. """
  136. # 第一步:参数验证 - 确保输入参数符合预期格式
  137. if not self.validate_parameters(parameters):
  138. return "❌ 参数验证失败"
  139. # 提取并清理命令参数
  140. command = parameters.get("command", "").strip()
  141. allow_dangerous = bool(parameters.get("allow_dangerous", False))
  142. shell_mode = bool(parameters.get("shell_mode", self.default_shell_mode))
  143. # 基础安全检查:拒绝空命令,防止无意义的系统调用
  144. if not command:
  145. return "❌ 命令不能为空"
  146. # 执行模式选择:根据shell_mode参数决定执行方式
  147. # shell_mode=True: 支持管道、重定向等复杂shell语法,但需要更严格的安全检查
  148. # shell_mode=False: 使用argv模式,更安全但不支持shell特性
  149. if shell_mode:
  150. return self._execute_shell(command, allow_dangerous=allow_dangerous)
  151. # 第二步:命令解析 - 使用shlex进行安全的命令分割,处理引号和转义
  152. try:
  153. parts = shlex.split(command)
  154. except ValueError as e:
  155. return f"❌ 命令解析失败: {e}"
  156. # 解析后再次验证,确保命令不为空
  157. if not parts:
  158. return "❌ 命令不能为空"
  159. base_command = parts[0]
  160. # 第三步:安全策略检查 - 命令白名单验证
  161. # 这是第一道安全防线,确保只能执行预定义的安全命令
  162. if base_command not in self.ALLOWED_COMMANDS:
  163. return f"❌ 不允许的命令: {base_command}\n允许的命令: {', '.join(sorted(self.ALLOWED_COMMANDS))}"
  164. # 特殊命令处理:git命令需要额外的子命令安全检查
  165. if base_command == "git":
  166. return self._handle_git(parts, allow_dangerous)
  167. # 第四步:危险操作确认机制
  168. # 当用户明确允许危险操作且启用了确认机制时,进行交互式确认
  169. # 这为高风险操作提供了最后一道人工确认防线
  170. if allow_dangerous and self.confirm_dangerous:
  171. ans = input(f"\n⚠️ 高风险命令:{command}\n允许执行?(y/n)\nconfirm> ").strip().lower()
  172. if ans not in {"y", "yes"}:
  173. return "⛔️ 已取消执行(用户未确认)。"
  174. # 特殊命令处理:cd命令需要单独处理以维护工作目录状态
  175. if base_command == 'cd':
  176. return self._handle_cd(parts)
  177. # 第五步:执行命令 - 通过所有安全检查后执行命令
  178. return self._execute_argv(parts, allow_dangerous=allow_dangerous)
  179. def get_parameters(self) -> List[ToolParameter]:
  180. """获取工具参数定义"""
  181. return [
  182. ToolParameter(
  183. name="command",
  184. type="string",
  185. description=(
  186. f"要执行的命令(白名单: {', '.join(sorted(list(self.ALLOWED_COMMANDS)[:10]))}...)\n"
  187. "示例: 'ls -la', 'cat file.txt', 'grep pattern *.py', 'head -n 20 data.csv'"
  188. ),
  189. required=True
  190. ),
  191. ToolParameter(
  192. name="allow_dangerous",
  193. type="boolean",
  194. description="是否允许高风险命令(默认false;仅在用户明确确认后才可设置为true)",
  195. required=False
  196. ),
  197. ToolParameter(
  198. name="shell_mode",
  199. type="boolean",
  200. description="是否允许 shell 语义(管道/重定向/多段命令等)。默认继承工具配置。",
  201. required=False,
  202. ),
  203. ]
  204. def _contains_shell_meta(self, command: str) -> bool:
  205. """检查命令中是否包含shell元字符
  206. Args:
  207. command: 要检查的命令字符串
  208. Returns:
  209. bool: 如果包含元字符返回True,否则返回False
  210. """
  211. return any(tok in command for tok in self.SHELL_META_TOKENS)
  212. # --- shell parsing helpers (ignore operators inside quotes) ---
  213. def _split_shell_segments(self, command: str) -> List[str]:
  214. """
  215. 按管道/逻辑与/逻辑或/分号操作符分割shell命令,忽略引号内的操作符。
  216. 返回分割后的段列表(不包含操作符)。
  217. 这个方法用于分析复杂的shell命令,确保每个段都是安全的。
  218. Args:
  219. command: 要分割的shell命令字符串
  220. Returns:
  221. List[str]: 分割后的命令段列表
  222. """
  223. ops = ["||", "&&", "|", ";"]
  224. segs: List[str] = []
  225. buf: List[str] = []
  226. i = 0
  227. quote: Optional[str] = None
  228. while i < len(command):
  229. ch = command[i]
  230. if ch in {"'", '"'}:
  231. if quote is None:
  232. quote = ch
  233. elif quote == ch:
  234. quote = None
  235. buf.append(ch)
  236. i += 1
  237. continue
  238. if ch == "\\":
  239. buf.append(ch)
  240. if i + 1 < len(command):
  241. buf.append(command[i + 1])
  242. i += 2
  243. else:
  244. i += 1
  245. continue
  246. if quote is None:
  247. matched = False
  248. for op in ops:
  249. if command.startswith(op, i):
  250. seg = "".join(buf).strip()
  251. if seg:
  252. segs.append(seg)
  253. buf = []
  254. i += len(op)
  255. matched = True
  256. break
  257. if matched:
  258. continue
  259. buf.append(ch)
  260. i += 1
  261. seg = "".join(buf).strip()
  262. if seg:
  263. segs.append(seg)
  264. return segs
  265. def _has_unquoted(self, command: str, token: str) -> bool:
  266. """检查token(如>或$()或|)是否出现在引号外
  267. 这个方法用于检测可能存在安全风险的shell操作符,
  268. 确保它们不是在引号内(引号内是安全的字符串字面量)。
  269. Args:
  270. command: 要检查的命令字符串
  271. token: 要查找的token字符串
  272. Returns:
  273. bool: 如果token出现在引号外返回True,否则返回False
  274. """
  275. q: Optional[str] = None
  276. i = 0
  277. while i < len(command):
  278. ch = command[i]
  279. if ch in {"'", '"'}:
  280. if q is None:
  281. q = ch
  282. elif q == ch:
  283. q = None
  284. i += 1
  285. continue
  286. if ch == "\\":
  287. i += 2
  288. continue
  289. if q is None and command.startswith(token, i):
  290. return True
  291. i += 1
  292. return False
  293. def _shell_requires_allow_dangerous(self, command: str) -> bool:
  294. """检查shell命令是否需要危险操作权限
  295. 此方法分析shell命令,判断其是否包含可能对系统造成风险的操作。
  296. 这是安全机制的重要组成部分,用于识别需要额外权限确认的命令。
  297. 安全检查逻辑:
  298. 1. 检查文件写入操作(>、>>重定向),但排除到/dev/null的重定向
  299. 2. 检查命令替换(`command`或$(command))
  300. 3. 检查已知的危险基础命令(rm、chmod等)
  301. 4. 检查Git的危险子命令(如reset --hard)
  302. Args:
  303. command: 要检查的shell命令字符串
  304. Returns:
  305. bool: 如果命令需要危险操作权限则返回True,否则返回False
  306. Note:
  307. - 此方法不会阻止命令执行,只是标识需要额外确认的命令
  308. - 实际的权限检查在执行阶段进行
  309. - 采用相对保守的策略,对/dev/null重定向等安全操作给予宽容
  310. """
  311. # 写盘/命令替换通常视为高风险,但常见的只读掩埋(如 2>/dev/null、|| echo)可放宽
  312. # 宽容规则:仅当重定向目标不是 /dev/null 时才视为写盘;简单的 "|| echo ..." 视为安全。
  313. if self._has_unquoted(command, ">") or self._has_unquoted(command, ">>"):
  314. # 忽略 /dev/null 重定向(这是安全的丢弃输出操作)
  315. if re.search(r">\s*/dev/null", command) or re.search(r">>\s*/dev/null", command):
  316. pass
  317. else:
  318. # 其他重定向操作可能修改文件内容,需要危险权限
  319. return True
  320. # 检查命令替换:`command`(反引号)和 $(command) 格式
  321. # 命令替换可能执行任意代码,存在代码注入风险
  322. if self._has_unquoted(command, "$(") or self._has_unquoted(command, "`"):
  323. return True
  324. # 检查已知的危险基础命令
  325. # 这些命令可能对系统造成不可逆的影响,需要特别关注
  326. if re.search(r"(^|\s)rm(\s|$)", command):
  327. return True
  328. if re.search(r"(^|\s)chmod(\s|$)", command):
  329. return True
  330. # 检查Git的危险子命令
  331. # git reset --hard 可能导致代码丢失,属于高风险操作
  332. if re.search(r"git\s+reset\s+--hard", command):
  333. return True
  334. # 如果没有检测到危险操作,则不需要额外权限
  335. return False
  336. def _shell_all_commands_whitelisted(self, command: str) -> bool:
  337. """
  338. 静态检查shell命令中的所有段是否都在白名单中(尽力而为的检查)
  339. 此方法通过分割shell命令为多个段,然后检查每个段的第一个命令是否在白名单中。
  340. 这是对shell命令的安全预检查,用于在未允许危险操作时确保命令的安全性。
  341. 安全检查策略:
  342. 1. 使用shell元字符分割命令为多个独立段
  343. 2. 对每个段进行命令解析,提取基础命令
  344. 3. 检查基础命令是否在白名单中
  345. 4. 对git命令进行特殊处理,只允许安全的只读子命令
  346. Args:
  347. command: 要检查的完整shell命令字符串
  348. Returns:
  349. bool: 如果所有命令段都在白名单中则返回True,否则返回False
  350. Note:
  351. - 这是一个尽力而为的检查,不能保证100%准确
  352. - 对于复杂的shell语法,可能存在误判
  353. - git命令只允许status和diff子命令,其他子命令被视为危险操作
  354. """
  355. # 预处理:将换行符替换为空格,便于统一处理
  356. cmd = command.replace("\n", " ")
  357. # 分割命令为多个段,每个段包含一个独立的命令
  358. segments = self._split_shell_segments(cmd)
  359. # 对每个命令段进行安全检查
  360. for seg in segments:
  361. seg = seg.strip()
  362. if not seg:
  363. continue # 跳过空段
  364. try:
  365. # 使用shlex进行安全的命令分割,处理引号和转义
  366. argv = shlex.split(seg)
  367. except Exception:
  368. # 如果解析失败,认为不安全
  369. return False
  370. if not argv:
  371. continue # 跳过空命令段
  372. # 获取基础命令(命令名的第一部分)
  373. base = argv[0]
  374. # 检查基础命令是否在白名单中
  375. if base not in self.ALLOWED_COMMANDS:
  376. return False
  377. # 对git命令进行特殊处理
  378. # 只允许安全的只读子命令,其他子命令被视为危险操作
  379. if base == "git":
  380. if len(argv) < 2:
  381. return False # git命令必须包含子命令
  382. if argv[1] not in {"status", "diff"}:
  383. return False # 只允许status和diff子命令
  384. # 所有命令段都通过了安全检查
  385. return True
  386. def _execute_shell(self, command: str, allow_dangerous: bool = False) -> str:
  387. """
  388. 执行shell命令字符串(支持Claude Code风格的shell特性)
  389. 此方法提供安全的shell命令执行能力,支持管道、重定向、命令替换等复杂shell语法。
  390. 通过多层安全检查机制确保命令执行的安全性。
  391. 安全防护措施:
  392. - 如果shell命令包含重定向/命令替换/已知危险操作 -> 需要allow_dangerous权限
  393. - 如果未允许危险操作 -> 要求所有命令段都在白名单中(尽力而为的检查)
  394. - confirm_dangerous可以提示用户确认包含shell元字符或需要allow_dangerous的命令
  395. 执行流程:
  396. 1. 检查命令是否需要危险权限
  397. 2. 验证命令白名单(如果未允许危险操作)
  398. 3. 用户确认(如果启用confirm_dangerous)
  399. 4. 执行命令并处理输出
  400. 5. 返回执行结果或错误信息
  401. Args:
  402. command: 要执行的完整shell命令字符串
  403. allow_dangerous: 是否允许执行危险操作(默认False)
  404. Returns:
  405. str: 命令执行结果或错误信息
  406. Note:
  407. - 支持管道操作而无需确认(如 'ls | grep .py')
  408. - 只有在可能写入文件/转义/执行危险操作时才需要确认
  409. - 输出会被截断以防止内存问题
  410. """
  411. needs_allow = self._shell_requires_allow_dangerous(command)
  412. # 第一层安全检查:危险操作检测
  413. # 如果命令包含危险操作但未允许危险操作,则拒绝执行
  414. # 这是第一道安全防线,防止潜在的恶意操作
  415. if needs_allow and not allow_dangerous:
  416. return "❌ 该命令包含写盘/子命令替换/高风险操作,需用户确认后再执行(allow_dangerous=true)"
  417. # 第二层安全检查:白名单验证
  418. # 如果未允许危险操作,则检查所有命令段是否都在白名单中
  419. # 这是对命令的进一步安全验证,确保只能执行预定义的安全命令
  420. if not allow_dangerous and not self._shell_all_commands_whitelisted(command):
  421. return "❌ shell_mode 下检测到非白名单命令/不允许的 git 子命令。需要用户确认后再执行(allow_dangerous=true)"
  422. # Claude Code-like: pipes are allowed without confirmation; only confirm when it may write/escape/execute dangerous ops.
  423. if self.confirm_dangerous and (allow_dangerous or needs_allow):
  424. ans = input(f"\n⚠️ 即将执行高风险 shell 命令:{command}\n允许执行?(y/n)\nconfirm> ").strip().lower()
  425. if ans not in {"y", "yes"}:
  426. return "⛔️ 已取消执行(用户未确认)。"
  427. try:
  428. result = subprocess.run(
  429. command,
  430. shell=True,
  431. cwd=str(self.current_dir),
  432. capture_output=True,
  433. text=True,
  434. timeout=self.timeout,
  435. env=os.environ.copy(),
  436. )
  437. output = (result.stdout or "") + (result.stderr or "")
  438. if len(output.encode("utf-8", errors="ignore")) > self.max_output_size:
  439. output = output[: self.max_output_size] + "\n...output truncated...\n"
  440. if result.returncode != 0:
  441. return f"命令执行失败 (返回码 {result.returncode}):\n{output}"
  442. return output.strip() if output.strip() else "(no output)"
  443. except subprocess.TimeoutExpired:
  444. return f"❌ 命令超时(>{self.timeout}s)"
  445. except Exception as e:
  446. return f"❌ 命令执行异常: {e}"
  447. def _handle_cd(self, parts: List[str]) -> str:
  448. """处理cd命令,实现安全的目录切换
  449. 此方法专门处理cd命令,确保目录切换在允许的工作空间范围内进行。
  450. 支持绝对路径、相对路径和特殊路径(如..、~)的处理。
  451. Args:
  452. parts: cd命令的参数列表,parts[1]是目标路径
  453. Returns:
  454. str: 执行结果或错误信息
  455. Note:
  456. - 只允许切换到工作空间内的目录
  457. - 支持路径规范化,处理..和.等相对路径
  458. - 不允许切换到工作空间之外的目录
  459. """
  460. if not self.allow_cd:
  461. return "❌ cd 命令已禁用"
  462. if len(parts) < 2:
  463. # cd命令没有参数时,返回当前目录信息
  464. return f"当前目录: {self.current_dir}"
  465. target_dir = parts[1]
  466. # 处理特殊的相对路径
  467. if target_dir == "..":
  468. # 切换到父目录
  469. new_dir = self.current_dir.parent
  470. elif target_dir == ".":
  471. # 当前目录
  472. new_dir = self.current_dir
  473. elif target_dir == "~":
  474. # 切换到工作目录根目录
  475. new_dir = self.workspace
  476. else:
  477. # 解析相对或绝对路径
  478. new_dir = (self.current_dir / target_dir).resolve()
  479. # 检查新目录是否在工作空间范围内
  480. try:
  481. new_dir.relative_to(self.workspace)
  482. except ValueError:
  483. return f"❌ 不允许访问工作目录外的路径: {new_dir}"
  484. # 验证目录存在性
  485. if not new_dir.exists():
  486. return f"❌ 目录不存在: {new_dir}"
  487. # 确保目标是目录而非文件
  488. if not new_dir.is_dir():
  489. return f"❌ 不是目录: {new_dir}"
  490. # 更新当前工作目录
  491. self.current_dir = new_dir
  492. return f"✅ 切换到目录: {self.current_dir}"
  493. def _execute_argv(self, argv: List[str], allow_dangerous: bool = False) -> str:
  494. """执行参数向量形式的命令(不使用shell解释)
  495. 此方法直接执行命令及其参数,不通过shell解释,因此不支持管道、重定向等shell特性。
  496. 这种方式更安全,因为避免了shell注入攻击的风险,但功能相对有限。
  497. 安全检查流程:
  498. 1. 检查高风险命令(rm、chmod等)是否需要用户确认
  499. 2. 对允许的高风险命令进行路径沙箱限制
  500. 3. 对mkdir命令进行路径沙箱限制
  501. 4. 执行命令并处理结果
  502. Args:
  503. argv: 命令及其参数的列表,argv[0]是命令名,其余是参数
  504. allow_dangerous: 是否允许执行高风险命令(默认False)
  505. Returns:
  506. str: 命令执行结果或错误信息
  507. Note:
  508. - 不支持shell特性如管道、重定向、变量替换等
  509. - 更安全,避免了shell注入攻击
  510. - 适用于简单的命令执行场景
  511. - 所有路径操作都被限制在工作空间内
  512. """
  513. # 第一层安全检查:高风险命令二次门禁
  514. # 对明确高风险基命令(rm/chmod等)进行额外检查
  515. # 这些命令可能对系统造成不可逆的修改,需要用户明确确认
  516. # 这是安全防护的第一道防线,防止意外的系统修改
  517. if argv and argv[0] in self.DANGEROUS_BASE_COMMANDS and not allow_dangerous:
  518. return f"❌ 高风险命令 {argv[0]} 需要人类确认(allow_dangerous=true)"
  519. # 第二层安全检查:路径沙箱限制
  520. # 对带路径参数的高风险命令做路径沙箱限制(仅当放行时)
  521. # 确保即使允许执行危险命令,也只能在工作空间内操作
  522. # 这防止了恶意用户通过相对路径或符号链接逃逸沙箱
  523. if argv and argv[0] in {"rm", "chmod"} and allow_dangerous:
  524. # 保守检查:所有看起来像路径的非标志参数都必须在工作空间内
  525. # 使用current_dir作为基准,确保相对路径也被正确限制
  526. for a in argv[1:]:
  527. if a.startswith("-"):
  528. continue # 跳过选项参数(如 -r, -f 等)
  529. candidate = (self.current_dir / a).resolve()
  530. try:
  531. # 检查解析后的绝对路径是否在工作空间内
  532. candidate.relative_to(self.workspace)
  533. except ValueError:
  534. # 路径超出工作空间范围,拒绝执行
  535. return f"❌ 拒绝在工作目录外操作: {a}"
  536. # 第三层安全检查:mkdir命令路径沙箱
  537. # mkdir虽然相对安全,但仍需确保不会在工作空间外创建目录
  538. # 这防止了通过mkdir命令在工作空间外创建后门或敏感目录
  539. if argv and argv[0] == "mkdir":
  540. for a in argv[1:]:
  541. if a.startswith("-"):
  542. continue # 跳过选项参数(如 -p, -m 等)
  543. candidate = (self.current_dir / a).resolve()
  544. try:
  545. # 检查要创建的目录是否在工作空间内
  546. candidate.relative_to(self.workspace)
  547. except ValueError:
  548. # 尝试在工作空间外创建目录,拒绝执行
  549. return f"❌ 不允许在工作目录外创建目录: {a}"
  550. try:
  551. # 直接执行命令,不通过shell
  552. # 使用shell=False确保命令不被shell解释,避免注入攻击
  553. # 使用capture_output=True捕获标准输出和标准错误
  554. # 使用text=True确保输出为文本格式
  555. result = subprocess.run(
  556. argv,
  557. shell=False,
  558. cwd=str(self.current_dir),
  559. capture_output=True,
  560. text=True,
  561. timeout=self.timeout,
  562. env=os.environ.copy(),
  563. )
  564. # 合并标准输出和标准错误
  565. # 这样用户可以看到完整的执行结果
  566. output = result.stdout
  567. if result.stderr:
  568. output += f"\n[stderr]\n{result.stderr}"
  569. # 检查输出大小,防止过大输出消耗内存
  570. if len(output) > self.max_output_size:
  571. output = output[:self.max_output_size]
  572. output += f"\n\n⚠️ 输出被截断(超过 {self.max_output_size} 字节)"
  573. # 添加返回码信息,帮助用户了解命令执行状态
  574. if result.returncode != 0:
  575. output = f"⚠️ 命令返回码: {result.returncode}\n\n{output}"
  576. return output if output else "✅ 命令执行成功(无输出)"
  577. except subprocess.TimeoutExpired:
  578. # 命令执行超时,可能是死循环或处理大量数据
  579. return f"❌ 命令执行超时(超过 {self.timeout} 秒)"
  580. except Exception as e:
  581. # 捕获其他异常,如权限错误、文件不存在等
  582. return f"❌ 命令执行失败: {e}"
  583. def _truncate_output(self, output: str) -> str:
  584. """截断过大的输出,防止内存问题
  585. 此方法检查输出字符串的长度,如果超过配置的最大输出大小,
  586. 则截断输出并添加提示信息,防止过大的输出消耗过多内存。
  587. Args:
  588. output: 需要检查的输出字符串
  589. Returns:
  590. str: 原始输出或截断后的输出(如果超过大小限制)
  591. Note:
  592. - 截断时会保留开头的内容,丢弃超出限制的部分
  593. - 会在截断的输出末尾添加提示信息,说明输出已被截断
  594. """
  595. if len(output) > self.max_output_size:
  596. return output[: self.max_output_size] + f"\n[输出被截断,超过 {self.max_output_size} 字节限制]"
  597. return output
  598. def get_current_dir(self) -> str:
  599. """获取当前工作目录"""
  600. return str(self.current_dir)
  601. def reset_dir(self):
  602. """重置到工作目录根"""
  603. self.current_dir = self.workspace