""" CLI Tunnel 工具 — 动态注册 Mind CLI 本地工具到 Hermes 编排器。 设计遵循 SPEC 铁律 B(能力报告义务): CLI 连接时上报能力 → Cloud 审批 → 动态注册到 ToolRegistry CLI 断开 → 从 ToolRegistry 注销 注册模式参照 MCP 动态发现(mcp_tool._register_server_tools), 用户隔离参照飞书连接器(feishu_tool.handler(args.user_id))。 与内置工具通过 `cli_` 前缀区分: cli_terminal = 在用户本地电脑执行命令 terminal = 在云端服务器执行命令 """ import asyncio import json import logging from typing import Any from tools.registry import registry, tool_error, tool_result logger = logging.getLogger("tools.cli_tunnel") # ── 模块级 bridge 引用(由 mindos_sse.py 注入) ──────────── _bridge = None def set_bridge(bridge) -> None: """注入 MindCLIBridge 实例(进程生命周期内调用一次)。""" global _bridge _bridge = bridge # ── 工具 Schema 模板 ───────────────────────────────────── # 每个 CLI 内置工具对应一个 schema 模板,用于注册到 LLM function calling。 # 只有 capability_report 中出现的工具才会被注册。 _TOOL_SCHEMAS: dict[str, dict] = { "terminal": { "name": "cli_terminal", "description": ( "在用户的本地电脑上执行终端命令。" "用于需要访问用户本地文件系统、开发环境或系统工具的场景。" "注意:这是在用户个人电脑上执行,不是在服务器上。" ), "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "要执行的 shell 命令", }, "cwd": { "type": "string", "description": "工作目录(可选,默认用户 home)", }, "timeout": { "type": "integer", "description": "超时秒数(可选,默认 30)", }, }, "required": ["command"], }, }, "file_read": { "name": "cli_file_read", "description": ( "读取用户本地电脑上的文件内容。" "用于查看用户电脑上的配置文件、代码、文档等。" ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "文件绝对路径", }, }, "required": ["path"], }, }, "file_write": { "name": "cli_file_write", "description": ( "写入文件到用户本地电脑。" "用于创建或更新用户电脑上的文件。" ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "文件绝对路径", }, "content": { "type": "string", "description": "文件内容", }, }, "required": ["path", "content"], }, }, "file_ops": { "name": "cli_file_ops", "description": ( "在用户本地电脑上执行文件操作(复制/移动/删除/列表)。" ), "parameters": { "type": "object", "properties": { "operation": { "type": "string", "description": "操作类型", "enum": ["copy", "move", "delete", "list"], }, "source": { "type": "string", "description": "源路径", }, "destination": { "type": "string", "description": "目标路径(copy/move 时必填)", }, }, "required": ["operation", "source"], }, }, "grep": { "name": "cli_grep", "description": ( "在用户本地电脑上搜索文件内容。" "用于在用户项目中查找代码、配置等。" ), "parameters": { "type": "object", "properties": { "pattern": { "type": "string", "description": "搜索模式", }, "path": { "type": "string", "description": "搜索路径(可选,默认当前目录)", }, }, "required": ["pattern"], }, }, "code_execution": { "name": "cli_code_execution", "description": ( "在用户本地电脑上执行代码片段(Python)。" "用于需要在用户本地环境中运行脚本的场景。" ), "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": "要执行的代码", }, "language": { "type": "string", "description": "编程语言(当前仅支持 python)", "enum": ["python"], }, }, "required": ["code"], }, }, } # ── 动态注册 / 注销 ────────────────────────────────────── def register_cli_tools( user_id: str, capabilities: dict, ) -> list[str]: """ CLI 连接时调用——根据能力报告动态注册工具到 Hermes ToolRegistry。 Args: user_id: 用户 ID capabilities: CLI 上报的 capability_report Returns: 已注册的工具名列表(带 cli_ 前缀) """ from toolsets import create_custom_toolset, TOOLSETS if not _bridge: logger.warning("[CliTunnel] bridge 未初始化,跳过工具注册") return [] reported_tools = {t["name"] for t in capabilities.get("tools", [])} registered: list[str] = [] for local_name, schema in _TOOL_SCHEMAS.items(): if local_name not in reported_tools: continue prefixed_name = schema["name"] # e.g. "cli_terminal" # 避免与非 CLI 内置工具冲突 existing_toolset = registry.get_toolset_for_tool(prefixed_name) if existing_toolset and existing_toolset != "connectors": logger.warning( "[CliTunnel] 工具 '%s' 与 toolset '%s' 冲突,跳过", prefixed_name, existing_toolset, ) continue registry.register( name=prefixed_name, toolset="connectors", schema=schema, handler=_make_cli_handler(local_name, user_id), check_fn=_make_check_fn(user_id), is_async=False, description=schema["description"], emoji="💻", ) registered.append(prefixed_name) # 注入到 hermes-* umbrella toolsets 以确保通过 enabled_toolsets 过滤 if registered: for ts_name, ts in TOOLSETS.items(): if ts_name.startswith("hermes-"): for tool_name in registered: if tool_name not in ts["tools"]: ts["tools"].append(tool_name) logger.info( "[CliTunnel] 为用户 %s 注册 %d 个本地工具: %s", user_id, len(registered), ", ".join(registered), ) return registered def deregister_cli_tools(tools: list[str]) -> None: """ CLI 断开时调用——从 ToolRegistry 注销工具。 Args: tools: 之前 register_cli_tools 返回的工具名列表 """ from toolsets import TOOLSETS for name in tools: registry.deregister(name) # 从 hermes-* umbrella toolsets 中移除 for ts_name, ts in TOOLSETS.items(): if ts_name.startswith("hermes-"): try: ts["tools"].remove(name) except ValueError: pass if tools: logger.info("[CliTunnel] 注销 %d 个本地工具: %s", len(tools), ", ".join(tools)) # ── Handler 工厂 ───────────────────────────────────────── def _make_cli_handler(local_tool_name: str, user_id: str): """ 返回闭包 handler,走 Tunnel 派发工具调用。 签名:handler(args, **kwargs) -> str(符合 registry.dispatch 要求) """ def _handler(args: dict, **kwargs) -> str: if not _bridge: return tool_error("CLI Bridge 未初始化") if not _bridge.is_connected(user_id): return tool_error( f"本地 CLI 未连接。请确保 Mind CLI 正在运行并已连接隧道。" ) # 同步调用异步 dispatch(复用 model_tools._run_async 模式) try: loop = asyncio.get_running_loop() except RuntimeError: loop = None coro = _bridge.dispatch_tool_call(user_id, local_tool_name, args) try: if loop and loop.is_running(): # 在 async 上下文中(SSE gateway)→ 开线程 import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: future = pool.submit(asyncio.run, coro) result = future.result(timeout=130) else: result = asyncio.run(coro) except Exception as e: logger.error("[CliTunnel] 工具 %s 调用失败: %s", local_tool_name, e) return tool_error(f"本地工具调用失败: {e}") # result 是 dict(从 CLI 返回的 JSON-RPC result) if isinstance(result, dict) and "error" in result: return tool_error(result["error"]) return json.dumps( {"ok": True, "tool": local_tool_name, "result": result}, ensure_ascii=False, ) return _handler def _make_check_fn(user_id: str): """ 返回 check_fn 闭包——registry 每次 get_definitions 时调用。 只有当 bridge 存在且该用户 CLI 在线时才返回 True, 离线时工具自动从 LLM 视野消失。 """ def _check() -> bool: return bool(_bridge and _bridge.is_connected(user_id)) return _check