MindOS_CLI/mindcli/managed_mcp.py
lidf 69dd868e2f init: MindOS CLI 本地执行体(从 mindOSv2/mindos-cli 独立)
- 独立 pyproject.toml(pip install -e .)
- vendor_hermes.sh 已改为显式路径模式(不再依赖相对目录)
- 包含 hermes vendor 快照
2026-04-28 13:12:54 +08:00

184 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Mind CLI — Managed MCP 治理层。
在 _vendor/tools/ 之上加白名单过滤。Cloud 审批通过的工具才能执行。
Managed 模式下:只允许 approved_tools 列表中的工具。
"""
import asyncio
import logging
import os
import subprocess
import sys
from typing import Any
logger = logging.getLogger("mindcli.managed_mcp")
class ManagedMCP:
"""
治理层:只暴露 Cloud 审批通过的工具。
Cloud 通过 Tunnel 握手下发 approved_tools 白名单,
后续 tool_call 请求先过白名单检查,再委托到 _vendor/tools/ 执行。
"""
def __init__(self, approved_tools: list[str] | None = None):
self._approved: set[str] = set(approved_tools or [])
# 工具名 → 执行函数的映射
self._executors: dict[str, Any] = {}
self._register_executors()
def update_approved(self, tools: list[str]) -> None:
"""Cloud 热更新白名单(无需重连)。"""
old = self._approved
self._approved = set(tools)
added = self._approved - old
removed = old - self._approved
if added:
logger.info("[ManagedMCP] 新增审批工具: %s", added)
if removed:
logger.info("[ManagedMCP] 移除审批工具: %s", removed)
def is_approved(self, tool_name: str) -> bool:
"""检查工具是否在白名单中。"""
return tool_name in self._approved
async def execute(self, tool_name: str, params: dict) -> dict:
"""
执行工具调用。
Args:
tool_name: 工具名(如 "terminal""grep"
params: 工具参数
Returns:
{"output": "...", "exit_code": 0} 或 {"error": "..."}
"""
if not self.is_approved(tool_name):
logger.warning("[ManagedMCP] 工具 '%s' 未审批,拒绝执行", tool_name)
return {"error": f"Tool '{tool_name}' not approved by Cloud"}
executor = self._executors.get(tool_name)
if not executor:
return {"error": f"Tool '{tool_name}' has no executor"}
try:
result = await executor(params)
return result
except Exception as e:
logger.error("[ManagedMCP] 工具 '%s' 执行异常: %s", tool_name, e)
return {"error": f"Execution failed: {str(e)}"}
def _register_executors(self) -> None:
"""注册内置工具的执行函数。"""
self._executors["terminal"] = self._exec_terminal
self._executors["file_read"] = self._exec_file_read
self._executors["file_write"] = self._exec_file_write
self._executors["grep"] = self._exec_grep
self._executors["file_ops"] = self._exec_file_ops
self._executors["code_execution"] = self._exec_code
# ── 内置工具执行器 ──────────────────────────────
async def _exec_terminal(self, params: dict) -> dict:
"""执行终端命令。"""
command = params.get("command", "")
cwd = params.get("cwd", os.path.expanduser("~"))
timeout = params.get("timeout", 30)
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
return {
"output": stdout.decode("utf-8", errors="replace"),
"stderr": stderr.decode("utf-8", errors="replace"),
"exit_code": proc.returncode,
}
except asyncio.TimeoutError:
proc.kill()
return {"error": f"Command timed out after {timeout}s", "exit_code": -1}
async def _exec_file_read(self, params: dict) -> dict:
"""读取文件内容。"""
path = params.get("path", "")
if not path or not os.path.isfile(path):
return {"error": f"File not found: {path}"}
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
return {"output": content, "size": len(content)}
except Exception as e:
return {"error": str(e)}
async def _exec_file_write(self, params: dict) -> dict:
"""写入文件。"""
path = params.get("path", "")
content = params.get("content", "")
if not path:
return {"error": "No path specified"}
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return {"output": f"Written {len(content)} bytes to {path}"}
except Exception as e:
return {"error": str(e)}
async def _exec_grep(self, params: dict) -> dict:
"""文本搜索ripgrep / grep"""
pattern = params.get("pattern", "")
path = params.get("path", ".")
if not pattern:
return {"error": "No pattern specified"}
cmd = f"grep -rn {_shell_quote(pattern)} {_shell_quote(path)}"
return await self._exec_terminal({"command": cmd, "timeout": 15})
async def _exec_file_ops(self, params: dict) -> dict:
"""文件操作copy / move / delete。"""
op = params.get("operation", "")
src = params.get("source", "")
dst = params.get("destination", "")
if op == "copy":
cmd = f"cp -r {_shell_quote(src)} {_shell_quote(dst)}"
elif op == "move":
cmd = f"mv {_shell_quote(src)} {_shell_quote(dst)}"
elif op == "delete":
cmd = f"rm -rf {_shell_quote(src)}"
elif op == "list":
cmd = f"ls -la {_shell_quote(src)}"
else:
return {"error": f"Unknown operation: {op}"}
return await self._exec_terminal({"command": cmd, "timeout": 15})
async def _exec_code(self, params: dict) -> dict:
"""执行代码片段Python"""
code = params.get("code", "")
language = params.get("language", "python")
if language != "python":
return {"error": f"Unsupported language: {language}"}
return await self._exec_terminal({
"command": f"{sys.executable} -c {_shell_quote(code)}",
"timeout": 30,
})
def _shell_quote(s: str) -> str:
"""简单的 shell 参数转义。"""
import shlex
return shlex.quote(s)