MindOS_CLI/mindcli/_vendor/tools/cli_tunnel_tool.py
lidf 3a1ecd7adc fix: 补全 vendor 快照(model_tools/utils/toolsets/gateway/cron/hermes_time)+ python-dotenv 依赖
- vendor_hermes.sh: 补全遗漏的单文件模块和目录
- pyproject.toml: 添加 python-dotenv 隐性依赖
- 全量 import 测试通过(216 .py, 180K 行)
- firecrawl/fal_client 为可选工具,缺失时优雅跳过
2026-04-29 02:24:04 +08:00

325 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CLI Tunnel 工具 — 动态注册 Mind CLI 本地工具到 Hermes 编排器。
设计遵循 SPEC 铁律 B能力报告义务
CLI 连接时上报能力 → Cloud 审批 → 动态注册到 ToolRegistry
CLI 断开 → 从 ToolRegistry 注销
注册模式参照 MCP 动态发现mcp_tool._register_server_tools
用户隔离参照飞书连接器feishu_tool.handler(args.user_id))。
与内置工具通过 `cli_` 前缀区分:
cli_terminal = 在用户本地电脑执行命令
terminal = 在云端服务器执行命令
"""
import asyncio
import json
import logging
from typing import Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger("tools.cli_tunnel")
# ── 模块级 bridge 引用(由 mindos_sse.py 注入) ────────────
_bridge = None
def set_bridge(bridge) -> None:
"""注入 MindCLIBridge 实例(进程生命周期内调用一次)。"""
global _bridge
_bridge = bridge
# ── 工具 Schema 模板 ─────────────────────────────────────
# 每个 CLI 内置工具对应一个 schema 模板,用于注册到 LLM function calling。
# 只有 capability_report 中出现的工具才会被注册。
_TOOL_SCHEMAS: dict[str, dict] = {
"terminal": {
"name": "cli_terminal",
"description": (
"在用户的本地电脑上执行终端命令。"
"用于需要访问用户本地文件系统、开发环境或系统工具的场景。"
"注意:这是在用户个人电脑上执行,不是在服务器上。"
),
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "要执行的 shell 命令",
},
"cwd": {
"type": "string",
"description": "工作目录(可选,默认用户 home",
},
"timeout": {
"type": "integer",
"description": "超时秒数(可选,默认 30",
},
},
"required": ["command"],
},
},
"file_read": {
"name": "cli_file_read",
"description": (
"读取用户本地电脑上的文件内容。"
"用于查看用户电脑上的配置文件、代码、文档等。"
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件绝对路径",
},
},
"required": ["path"],
},
},
"file_write": {
"name": "cli_file_write",
"description": (
"写入文件到用户本地电脑。"
"用于创建或更新用户电脑上的文件。"
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件绝对路径",
},
"content": {
"type": "string",
"description": "文件内容",
},
},
"required": ["path", "content"],
},
},
"file_ops": {
"name": "cli_file_ops",
"description": (
"在用户本地电脑上执行文件操作(复制/移动/删除/列表)。"
),
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"description": "操作类型",
"enum": ["copy", "move", "delete", "list"],
},
"source": {
"type": "string",
"description": "源路径",
},
"destination": {
"type": "string",
"description": "目标路径copy/move 时必填)",
},
},
"required": ["operation", "source"],
},
},
"grep": {
"name": "cli_grep",
"description": (
"在用户本地电脑上搜索文件内容。"
"用于在用户项目中查找代码、配置等。"
),
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "搜索模式",
},
"path": {
"type": "string",
"description": "搜索路径(可选,默认当前目录)",
},
},
"required": ["pattern"],
},
},
"code_execution": {
"name": "cli_code_execution",
"description": (
"在用户本地电脑上执行代码片段Python"
"用于需要在用户本地环境中运行脚本的场景。"
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "要执行的代码",
},
"language": {
"type": "string",
"description": "编程语言(当前仅支持 python",
"enum": ["python"],
},
},
"required": ["code"],
},
},
}
# ── 动态注册 / 注销 ──────────────────────────────────────
def register_cli_tools(
user_id: str,
capabilities: dict,
) -> list[str]:
"""
CLI 连接时调用——根据能力报告动态注册工具到 Hermes ToolRegistry。
Args:
user_id: 用户 ID
capabilities: CLI 上报的 capability_report
Returns:
已注册的工具名列表(带 cli_ 前缀)
"""
from toolsets import create_custom_toolset, TOOLSETS
if not _bridge:
logger.warning("[CliTunnel] bridge 未初始化,跳过工具注册")
return []
reported_tools = {t["name"] for t in capabilities.get("tools", [])}
registered: list[str] = []
for local_name, schema in _TOOL_SCHEMAS.items():
if local_name not in reported_tools:
continue
prefixed_name = schema["name"] # e.g. "cli_terminal"
# 避免与非 CLI 内置工具冲突
existing_toolset = registry.get_toolset_for_tool(prefixed_name)
if existing_toolset and existing_toolset != "connectors":
logger.warning(
"[CliTunnel] 工具 '%s' 与 toolset '%s' 冲突,跳过",
prefixed_name, existing_toolset,
)
continue
registry.register(
name=prefixed_name,
toolset="connectors",
schema=schema,
handler=_make_cli_handler(local_name, user_id),
check_fn=_make_check_fn(user_id),
is_async=False,
description=schema["description"],
emoji="💻",
)
registered.append(prefixed_name)
# 注入到 hermes-* umbrella toolsets 以确保通过 enabled_toolsets 过滤
if registered:
for ts_name, ts in TOOLSETS.items():
if ts_name.startswith("hermes-"):
for tool_name in registered:
if tool_name not in ts["tools"]:
ts["tools"].append(tool_name)
logger.info(
"[CliTunnel] 为用户 %s 注册 %d 个本地工具: %s",
user_id, len(registered), ", ".join(registered),
)
return registered
def deregister_cli_tools(tools: list[str]) -> None:
"""
CLI 断开时调用——从 ToolRegistry 注销工具。
Args:
tools: 之前 register_cli_tools 返回的工具名列表
"""
from toolsets import TOOLSETS
for name in tools:
registry.deregister(name)
# 从 hermes-* umbrella toolsets 中移除
for ts_name, ts in TOOLSETS.items():
if ts_name.startswith("hermes-"):
try:
ts["tools"].remove(name)
except ValueError:
pass
if tools:
logger.info("[CliTunnel] 注销 %d 个本地工具: %s", len(tools), ", ".join(tools))
# ── Handler 工厂 ─────────────────────────────────────────
def _make_cli_handler(local_tool_name: str, user_id: str):
"""
返回闭包 handler走 Tunnel 派发工具调用。
签名handler(args, **kwargs) -> str符合 registry.dispatch 要求)
"""
def _handler(args: dict, **kwargs) -> str:
if not _bridge:
return tool_error("CLI Bridge 未初始化")
if not _bridge.is_connected(user_id):
return tool_error(
f"本地 CLI 未连接。请确保 Mind CLI 正在运行并已连接隧道。"
)
# 同步调用异步 dispatch复用 model_tools._run_async 模式)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
coro = _bridge.dispatch_tool_call(user_id, local_tool_name, args)
try:
if loop and loop.is_running():
# 在 async 上下文中SSE gateway→ 开线程
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, coro)
result = future.result(timeout=130)
else:
result = asyncio.run(coro)
except Exception as e:
logger.error("[CliTunnel] 工具 %s 调用失败: %s", local_tool_name, e)
return tool_error(f"本地工具调用失败: {e}")
# result 是 dict从 CLI 返回的 JSON-RPC result
if isinstance(result, dict) and "error" in result:
return tool_error(result["error"])
return json.dumps(
{"ok": True, "tool": local_tool_name, "result": result},
ensure_ascii=False,
)
return _handler
def _make_check_fn(user_id: str):
"""
返回 check_fn 闭包——registry 每次 get_definitions 时调用。
只有当 bridge 存在且该用户 CLI 在线时才返回 True
离线时工具自动从 LLM 视野消失。
"""
def _check() -> bool:
return bool(_bridge and _bridge.is_connected(user_id))
return _check