""" Mind CLI — 命令行入口。 通过 Click 定义 `mind` 命令族。 - chat/ask 走 _vendor/run_agent.py 的 headless AIAgent(铁律 A:决策权归云端) - LLM 调用 100% 走 Cloud Gateway(JWT 计费),不在本地拉起 vendor TUI - --offline 备选:走 _vendor/cli.py 的完整 TUI(铁律 C:断开即自治) """ import click import json import os import sys # 确保 _vendor/ 已注入 sys.path import mindcli # noqa: F401 — 触发 __init__.py 的 sys.path 注入 # ── Cloud Gateway 配置 ──────────────────────────────────── def _get_cloud_gateway_url() -> str: """Cloud Gateway 的 LLM API base URL。""" return os.environ.get( "MINDOS_GATEWAY_URL", "https://agent.brainwork.club/mindos-next/llm", ) def _get_jwt() -> str | None: """获取 JWT(从 Tunnel 句柄或环境变量)。 优先从 health server 的 tunnel handle 获取; 离线/无 tunnel 时回退到 MINDOS_JWT 环境变量。 """ jwt = os.environ.get("MINDOS_JWT") if jwt: return jwt # 尝试从 health server 查询 tunnel 状态获取 JWT #(JWT 存在 tunnel handle 内存中,不落盘) import urllib.request try: req = urllib.request.Request("http://127.0.0.1:8660/tunnel/status") with urllib.request.urlopen(req, timeout=2) as resp: data = json.loads(resp.read()) if data.get("status") == "connected": # tunnel 已连接,JWT 在 handle 内存中 # 通过环境变量 MINDOS_JWT 传递(由 /tunnel/activate 时设置) return os.environ.get("MINDOS_JWT") except Exception: pass return None # ── Click 命令组 ────────────────────────────────────────── @click.group(invoke_without_command=True) @click.version_option(version=mindcli.__version__, prog_name="mind") @click.pass_context def main(ctx): """MindOS NEXT CLI — Cloud Hermes 的本地执行节点。""" if ctx.invoked_subcommand is None: click.echo(ctx.get_help()) @main.command() @click.option("--model", "-m", default="", help="模型名称(默认使用配置文件)") @click.option("--skills", "-s", multiple=True, help="加载指定 skill") @click.option("--resume", "-r", default="", help="恢复指定会话 ID") @click.option("--offline", is_flag=True, default=False, help="离线模式:使用 vendor TUI + 本地 LLM 配置(铁律 C:断开即自治)") def chat(model, skills, resume, offline): """进入交互式 Chat。 默认走 Cloud Gateway(JWT 计费)。 --offline 走 _vendor/cli.py 的完整 TUI(本地 LLM 配置)。 """ if offline: _chat_offline(model, skills, resume) return _chat_cloud(model, skills, resume) def _chat_cloud(model, skills, resume): """走 Cloud Gateway 的交互式 chat(run_agent headless)。""" jwt = _get_jwt() if not jwt: click.echo("❌ 未找到 JWT,请先连接 Tunnel:mind tunnel connect --token ") click.echo(" 或设置环境变量: export MINDOS_JWT=") sys.exit(1) from run_agent import AIAgent agent = AIAgent( base_url=_get_cloud_gateway_url(), api_key=jwt, model=model or None, enabled_toolsets=list(skills) if skills else None, session_id=resume or None, ) click.echo("🤖 Mind CLI (Cloud Gateway 模式)") click.echo("=" * 50) # 交互循环 conversation_history = [] system_message = None while True: try: user_input = input("\n你: ").strip() except (EOFError, KeyboardInterrupt): click.echo("\n👋 再见!") break if not user_input: continue if user_input.lower() in ("exit", "quit", "q"): click.echo("👋 再见!") break try: result = agent.run_conversation( user_message=user_input, system_message=system_message, conversation_history=conversation_history, ) # 更新会话历史 if result.get("messages"): conversation_history = result["messages"] response = result.get("response", "") if response: click.echo(f"\n🤖 {response}") except Exception as e: click.echo(f"\n❌ 错误: {e}") def _chat_offline(model, skills, resume): """离线模式:走 vendor cli.py 的完整 TUI(本地 LLM 配置)。""" from cli import main as hermes_main kwargs = {} if model: kwargs["model"] = model if skills: kwargs["skills"] = ",".join(skills) if resume: kwargs["resume"] = resume hermes_main(**kwargs) @main.command() @click.argument("question") @click.option("--model", "-m", default="", help="模型名称") @click.option("--format", "fmt", default="text", help="输出格式: text/json/markdown") def ask(question, model, fmt): """单次查询(= hermes -q)。 走 Cloud Gateway(JWT 计费)。 """ jwt = _get_jwt() if not jwt: click.echo("❌ 未找到 JWT,请先连接 Tunnel:mind tunnel connect --token ") sys.exit(1) from run_agent import AIAgent agent = AIAgent( base_url=_get_cloud_gateway_url(), api_key=jwt, model=model or None, ) try: result = agent.run_conversation(user_message=question) response = result.get("response", "") if fmt == "json": click.echo(json.dumps({"question": question, "answer": response}, ensure_ascii=False, indent=2)) elif fmt == "markdown": click.echo(f"## Q: {question}\n\n{response}") else: click.echo(response) except Exception as e: click.echo(f"❌ 错误: {e}", err=True) sys.exit(1) @main.command() def health(): """显示本地 CLI 健康状态。""" status = { "ok": True, "version": mindcli.__version__, "python": sys.version.split()[0], "vendor": _get_vendor_commit(), "tunnel": "disconnected", # Phase 2 实现 "tools": 0, # Phase 2 实现 } click.echo(json.dumps(status, indent=2, ensure_ascii=False)) @main.command() @click.option("--port", "-p", default=8660, help="Health Server 端口") def start(port): """启动 Health Server(前台运行,Ctrl+C 退出)。""" from mindcli.health import start_health_server start_health_server(port=port, foreground=True) @main.command(name="install-service") def install_service(): """注册为 macOS launchd 服务(开机自启)。""" from mindcli.service import install_service as _install _install() @main.command(name="uninstall-service") def uninstall_service(): """注销 macOS launchd 服务。""" from mindcli.service import uninstall_service as _uninstall _uninstall() @main.command() def capabilities(): """显示本地可用工具和 MCP Server。""" from mindcli.capability import scan_capabilities cap = scan_capabilities() click.echo(json.dumps(cap, indent=2, ensure_ascii=False)) @main.group() def tunnel(): """管理 Cloud ↔ Local 隧道。""" pass @tunnel.command() def status(): """查看 Tunnel 连接状态。""" import urllib.request try: req = urllib.request.Request("http://127.0.0.1:8660/tunnel/status") with urllib.request.urlopen(req, timeout=2) as resp: data = json.loads(resp.read()) click.echo(json.dumps(data, indent=2, ensure_ascii=False)) except Exception: click.echo(json.dumps({"status": "disconnected", "note": "Health Server 未运行"}, indent=2)) @tunnel.command() @click.option("--url", default="wss://agent.brainwork.club/mindos-next/ws/cli-tunnel", help="Cloud Tunnel WebSocket URL") @click.option("--token", required=True, help="MindPass JWT") def connect(url, token): """手动建立 Tunnel 连接(通常由浏览器自动触发)。""" import urllib.request data = json.dumps({"token": token, "tunnelUrl": url}).encode() try: req = urllib.request.Request( "http://127.0.0.1:8660/tunnel/activate", data=data, headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=5) as resp: result = json.loads(resp.read()) click.echo(f"✅ Tunnel activate: {result}") except Exception as e: click.echo(f"❌ 失败: {e}") click.echo(" 确认 Health Server 已运行(mind start)") @main.group() def record(): """管理本地系统录音。""" pass @record.command(name="start") @click.option("--token", default="", help="MindPass JWT(默认使用 Tunnel 已有的)") @click.option("--chat-id", default="", help="对话 ID") @click.option("--source", type=click.Choice(["system", "mic"]), default="system", help="音频源:system=系统音频(默认),mic=麦克风") def record_start(token, chat_id, source): """开始录音(独立音频源 → Cloud ASR)。 双工模式下,CLI 和浏览器各自独立推送,不做混音。 可多次调用不同 --source 启动多路录音(双工模式)。 """ import urllib.request body = json.dumps({"token": token, "chatId": chat_id, "source": source}).encode() try: req = urllib.request.Request( "http://127.0.0.1:8660/record/start", data=body, headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read()) if result.get("ok"): click.echo(f"🎙️ 录音已开始 source={result.get('source')} meetingId={result.get('meetingId')}") click.echo(" 使用 `mind record stop` 停止") else: click.echo(f"❌ {result.get('error')}") except Exception as e: click.echo(f"❌ 失败: {e}") click.echo(" 确认 Health Server 已运行(mind start)") @record.command(name="stop") @click.option("--chat-id", default="", help="停止指定 chatId 的录音(默认停止全部)") def record_stop(chat_id): """停止录音。""" import urllib.request url = "http://127.0.0.1:8660/record/stop" if chat_id: url += f"?chatId={chat_id}" try: req = urllib.request.Request( url, data=b"{}", headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read()) if result.get("ok"): if "stopped" in result: click.echo(f"⏹️ 已停止 {result['stopped']} 路录音") else: click.echo(f"⏹️ 录音已停止 duration={result.get('duration')}s") else: click.echo(f"❌ {result.get('error')}") except Exception as e: click.echo(f"❌ 失败: {e}") @record.command(name="status") def record_status(): """查看录音状态。""" import urllib.request try: req = urllib.request.Request("http://127.0.0.1:8660/record/status") with urllib.request.urlopen(req, timeout=2) as resp: data = json.loads(resp.read()) if data.get("running"): for cap in data.get("captures", []): click.echo(f"🎙️ 录音中 duration={cap.get('duration')}s " f"chatId={cap.get('chatId')} source={cap.get('source')}") else: click.echo("⏹️ 未在录音") except Exception: click.echo("⏹️ Health Server 未运行") @main.command() def update(): """检查并升级到最新版本。""" import subprocess click.echo(f"当前版本: v{mindcli.__version__}") click.echo("正在检查最新版本...") remote = _fetch_remote_info() if not remote: click.echo("⚠️ 无法检查远端版本(网络问题?)") return remoteVersion = remote.get("version") if not remoteVersion: click.echo("⚠️ 远端版本信息异常") return if remoteVersion == mindcli.__version__: click.echo(f"✅ 已是最新版本 v{mindcli.__version__}") return # 防降级:远端版本低于本地时,不升级(服务器 versions.json 可能未同步) if _compare_versions(remoteVersion, mindcli.__version__) < 0: click.echo(f"⚠️ 本地 v{mindcli.__version__} 已高于远端 v{remoteVersion}(服务器版本未同步?)") return click.echo(f"⬆️ 发现新版本 v{remoteVersion}") if remote.get("releaseNotes"): click.echo(f" {remote['releaseNotes']}") click.echo("正在升级...") # 优先用远端下发的 upgradeCmd;无则本地检测安装方式 installCmd = remote.get("upgradeCmd") or _detect_upgrade_command() click.echo(f" 执行: {installCmd}") result = subprocess.run( installCmd, shell=True, # 含引号/管道,必须 shell=True capture_output=True, text=True, ) if result.returncode == 0: click.echo(f"✅ 已升级到 v{remoteVersion}") click.echo(" 请重启 Mind CLI(mind start)以生效") else: click.echo(f"❌ 升级失败: {result.stderr[:300]}") click.echo(" 可手动执行: pipx reinstall mindos-cli") def _compare_versions(v1: str, v2: str) -> int: """比较两个语义化版本号。返回 -1 (v1v2)。""" def _parse(v: str): parts = [] for p in v.strip().split("."): try: parts.append(int(p)) except ValueError: parts.append(0) return parts a, b = _parse(v1), _parse(v2) # 补齐长度 while len(a) < len(b): a.append(0) while len(b) < len(a): b.append(0) for x, y in zip(a, b): if x < y: return -1 if x > y: return 1 return 0 def _detect_upgrade_command() -> str: """检测当前安装方式,返回正确的升级命令。 pipx 安装 → pipx reinstall(从 git URL 重新安装到隔离 venv) pip 安装 → python -m pip install --upgrade(用当前解释器的 pip) """ import sys # 检测是否运行在 pipx 的 venv 中 # pipx venv 路径特征:包含 .local/share/pipx/venvs/<包名> python_path = sys.executable if "pipx/venvs" in python_path or "pipx\\venvs" in python_path: return "pipx reinstall mindos-cli" # 默认 pip 升级(用 sys.executable -m pip,避免 PATH 里找不到 pip) return f"{sys.executable} -m pip install --upgrade git+https://git.brainwork.club/lidf/MindOS_CLI.git" def _fetch_remote_info() -> dict | None: """从 versions.json 获取远端 CLI 版本信息。 返回 {"version": str, "upgradeCmd": str|None, "releaseNotes": str|None} 或 None。 """ import urllib.request versionsUrl = "https://dl.brainwork.club/mindos-next/versions.json" try: req = urllib.request.Request(versionsUrl) with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) cli = data.get("cli", {}) return { "version": cli.get("version"), "upgradeCmd": cli.get("upgradeCmd"), "releaseNotes": cli.get("releaseNotes"), } except Exception: return None def _check_remote_version(): """从 versions.json 获取远端 CLI 最新版本号(兼容旧接口)。""" info = _fetch_remote_info() return info.get("version") if info else None def _get_vendor_commit() -> str: """读取 _vendor/VENDOR_COMMIT 文件获取 vendor 版本。""" commitFile = os.path.join(mindcli._VENDOR_DIR, "VENDOR_COMMIT") try: with open(commitFile) as f: for line in f: if line.startswith("commit:"): return line.split(":", 1)[1].strip() return "unknown" except FileNotFoundError: return "unknown" if __name__ == "__main__": main()