diff --git a/docs/SPEC_mindcli_atomization.md b/docs/SPEC_mindcli_atomization.md new file mode 100644 index 0000000..1fbc50e --- /dev/null +++ b/docs/SPEC_mindcli_atomization.md @@ -0,0 +1,598 @@ +# SPEC: Mind CLI 无状态原子化改造 + +> **版本**: v1.0 +> **日期**: 2026-07-01 +> **状态**: Draft — 待确认后执行 +> **适用范围**: MindOS_CLI (`mindcli/` 薄壳层,不含 `_vendor/`) +> **前置依赖**: `SPEC_mindos_next_cli.md` v2.3(设计哲学与三铁律) +> **参照模式**: `hermes-overlay/infra/pipelines/anyfile2md.py`(无状态管线组合器) + +--- + +## 一、改造动机 + +### 1.1 当前状态:有状态单体 + +`mindcli/` 薄壳层(7 个文件 / 1633 行)当前是**进程级有状态单体**,具体表现: + +| 模块 | 状态性 | 行数 | 问题 | +|:---|:---|---:|:---| +| `recorder.py` | 模块级单例 `_recorder` + `_running` 互斥 | 436 | **一进程只能一路录音**,双工模式不可用 | +| `tunnel.py` | 模块级单例 `_tunnel_client`,反向写 `health._tunnel_status` | 242 | 循环耦合,状态经全局变量传递 | +| `health.py` | 模块级全局 `_loop` / `_tunnel_status` / `_tool_count` | 278 | 状态枢纽 + 端口 8660 绑定 | +| `managed_mcp.py` | 实例级状态,经 tunnel 单例间接唯一 | 183 | 随 tunnel 存活,无法独立测试 | +| `cli.py` | 自身无状态,但 `mind chat` 拉起 vendor TUI | 272 | `_vendor/cli.py` 有 7 个模块级全局 + `sys.exit()` | +| `capability.py` | 无状态纯函数 | 79 | ✅ 已达标 | +| `service.py` | 无状态纯函数 | 116 | ✅ 已达标 | + +### 1.2 与设计目标的偏差 + +对照 `SPEC_mindos_next_cli.md` 的三铁律: + +| 铁律 | 设计要求 | 当前实现 | 偏差 | +|:---|:---|:---|:---:| +| **A:决策权归云端** | CLI 的 LLM 调用 100% 走 Cloud Gateway(JWT 计费) | `mind chat` 拉起 vendor TUI,本地持有 `conversation_history` + SQLite session,`sys.exit()` 杀进程 | 🔴 端有自己的脑 | +| **B:能力报告义务** | CLI 连接时报告能力 | `capability.py` 无状态扫描 | ✅ 已达标 | +| **C:连接即受管/断开即自治** | 两种模式互斥,边界清晰 | tunnel 单例 + recorder 单例耦合在同一进程,断开后 recorder 无受管方 | 🟡 边界模糊 | + +### 1.3 最严重的功能性后果 + +**双工模式在代码层面不可能实现。** + +设计文档 §Phase4 明确要求: + +> 双工模式 = 浏览器 mic session A + CLI system session B,两个独立 ASR session。 + +但当前 `recorder.py` 的实现: + +```python +# recorder.py:35-43 +_recorder: "SystemRecorder | None" = None + +def get_recorder() -> "SystemRecorder": + global _recorder + if _recorder is None: + _recorder = SystemRecorder() + return _recorder + +# recorder.py:110-111(start 方法内) +if self._running: + return {"error": "录音已在进行中"} +``` + +单例 + `_running` 互斥 = 第二路录音直接返回 error。双工模式不可能。 + +### 1.4 参照模式:anyfile2md + +`hermes-overlay/infra/pipelines/anyfile2md.py`(177 行)是项目内已验证的"无状态管线组合器"范式: + +``` +infra/atoms/ ← 原子(text_sniffer / vlm_ocr / page_rasterizer ...) + ↑ 组合 +infra/pipelines/ ← 无状态管线(anyfile2md: parse() / parseLocal() / isSupported()) + ↑ 调用 +platforms/*_sse ← SSE 子类(绑定 SSE 推送 + DB + 积分等状态化职责) +``` + +**关键特征**: +- 管线层无 `self`、无类属性、无模块级可变状态。 +- 文件头声明:"管线只做编排,不做逻辑。无状态:不写 DB、不推 SSE、不管 session。" +- 进度通知通过 `onProgress` 回调注入,管线本身不依赖它。 +- 状态化职责(DB / SSE / 积分)全部下沉到调用方 SSE 子类。 + +Mind CLI 应照此分层:**把 recorder/tunnel/managed_mcp 的核心逻辑抽成无状态管线,状态化职责上交给 health server 调用方。** + +--- + +## 二、改造目标 + +### 2.1 不改的 + +- **不把 MindCLI 变成 bookqa_sse 那样的独立无状态 SSE 原子。** MindCLI 是 Cloud 的执行节点(设计文档 §1.1),依赖 Cloud Gateway 做 LLM 调用、依赖 Cloud ASR 做转写。它的"连接态"必须存在,不可能做到 `stateless: True`。 +- **不碰 `_vendor/` 内部。** vendor 快照是只读副本,内部重构应在 hermes 上游做,快照只消费。 +- **不改 `capability.py` 和 `service.py`。** 已是无状态纯函数,达标。 + +### 2.2 要改的 + +| # | 改造对象 | 目标 | 价值 | +|:---:|:---|:---|:---| +| 1 | `recorder.py` | 拆成 `pipelines/audio_capture.py`(无状态)+ health handler(有状态调用方) | **解锁双工模式** | +| 2 | `tunnel.py` | 拆成 `pipelines/tunnel_session.py`(无状态)+ health handler(持有 handle) | **消除循环耦合** | +| 3 | `managed_mcp.py` | 抽出 `pipelines/tool_proxy.py`(无状态执行)+ 实例级白名单 | 解耦生命周期 | +| 4 | `cli.py` 的 `chat`/`ask` | 走 `run_agent` headless 接口,不拉 vendor TUI | **修正铁律 A 偏差** | + +### 2.3 改造后的分层架构 + +``` +Layer 3: 调用方(有状态,进程级) + mindcli/health.py — HTTP handler + 持有: _tunnel_handle, _active_captures: dict[chatId, CaptureHandle] + 职责: 端口绑定、生命周期管理、状态聚合 + ↓ 调用 +Layer 2: 无状态管线(新增,类比 infra/pipelines/) + mindcli/pipelines/audio_capture.py — capture(source, ws_ep, on_text) -> CaptureHandle + mindcli/pipelines/tunnel_session.py — connect(url, jwt, on_dispatch, on_status) -> TunnelHandle + mindcli/pipelines/tool_proxy.py — execute(tool, params, approved_set) -> dict + ↓ 组合 +Layer 1: 原子(无状态函数,已存在) + mindcli/capability.py — scan_capabilities() -> dict + mindcli/service.py — install_service() / uninstall_service() + managed_mcp executors — _exec_terminal / _exec_file_read / ...(纯 subprocess/IO) +``` + +--- + +## 三、详细设计 + +### 3.1 第一刀:recorder.py → pipelines/audio_capture.py + +**当前问题**(436 行): + +``` +recorder.py +├── _recorder 全局单例 (L35) +├── get_recorder() 工厂 (L38) +├── SystemRecorder 类 (L46) +│ ├── __init__: _running / _ws / _source / _chat_id / _buf / _loop ... (L54) +│ ├── start(): if _running: return error (L89, L110) +│ ├── stop() (L154) +│ ├── _start_mic() (L184) +│ ├── _start_system_audio() + _on_system_audio() (L206, L277) +│ ├── _stop_capture() (L295) +│ ├── _push_loop() (L316) +│ └── _ws_recv_loop() (L352) +├── _sync_get_sharable_content() (L375) ← macOS 权限检测,无状态 +├── _SCStreamDelegate 定义 (L397-432) ← pyobjc 类定义,无状态 +``` + +**拆分方案**: + +``` +mindcli/pipelines/audio_capture.py (新增,~200 行,无状态) + ├── async def capture(source, ws_endpoint, on_text) -> CaptureHandle + ├── class CaptureHandle: ← 轻量句柄,非单例 + │ async def stop() -> dict + │ def status() -> dict + ├── def check_permissions() -> bool ← 从 _sync_get_sharable_content 迁入 + └── _SCStreamDelegate (pyobjc 类,从 recorder.py 迁入) + +mindcli/health.py (修改,有状态调用方) + ├── _active_captures: dict[str, CaptureHandle] = {} ← 按 chatId 索引,可多实例 + ├── /record/start handler: + │ handle = await capture(source, ws_ep, on_text) + │ _active_captures[chatId] = handle + └── /record/stop handler: + handle = _active_captures.pop(chatId) + await handle.stop() +``` + +**核心变化**: + +```python +# ── 改造前(recorder.py,单例 + 互斥)───────────── +_recorder = None + +def get_recorder(): + global _recorder + if _recorder is None: + _recorder = SystemRecorder() + return _recorder + +class SystemRecorder: + async def start(self, ...): + if self._running: + return {"error": "录音已在进行中"} + # ... + +# ── 改造后(pipelines/audio_capture.py,无状态)─── +async def capture( + source: str, # "system" | "mic" + ws_endpoint: str, # Cloud ASR WebSocket URL + on_text: Callable, # 回调:转写结果 +) -> CaptureHandle: + """启动一路音频采集,返回独立句柄。无单例、无互斥。 + + 可多次调用,每次返回独立 handle,双工模式 = 两个 handle 并存。 + """ + handle = CaptureHandle(source, ws_endpoint, on_text) + await handle._start() + return handle + +class CaptureHandle: + """一路音频采集的句柄。生命周期由调用方管理。""" + # 不持有全局状态,不是单例 +``` + +**双工模式验证**: + +```python +# health.py 的 /record/start handler +handle_a = await capture("mic", ws_ep, on_text) # 浏览器 mic +_active_captures["chatA"] = handle_a + +handle_b = await capture("system", ws_ep, on_text) # CLI 系统音频 +_active_captures["chatB"] = handle_b +# ✅ 两路独立并存,符合设计文档 §Phase4 +``` + +### 3.2 第二刀:tunnel.py → pipelines/tunnel_session.py + +**当前问题**(242 行): + +``` +tunnel.py +├── TunnelClient 类 (L23) +│ ├── __init__: _jwt / _ws / _managed_mcp / 3 个后台 Task (L34) +│ ├── activate(jwt, url) (L70) +│ ├── disconnect() (L92) +│ ├── _connect_loop() (L115) ← 重连退避 +│ ├── _connect() (L139) ← 握手 + 能力上报 +│ ├── _run() (L188) ← 消息循环 +│ └── _handle_tool_call(msg) (L208) +├── _tunnel_client = TunnelClient() (L237) ← 模块级单例 +└── get_tunnel_client() (L240) +``` + +**循环耦合**:`tunnel.py:184` 反向 `from mindcli.health import set_tunnel_status`,写 `health._tunnel_status` 全局变量。 + +**拆分方案**: + +``` +mindcli/pipelines/tunnel_session.py (新增,~150 行,无状态) + ├── async def connect(url, jwt, on_dispatch, on_status) -> TunnelHandle + ├── class TunnelHandle: + │ async def disconnect() + │ def status() -> str + │ def update_approved(tools) + └── (重连/心跳/消息循环逻辑迁入,状态通过 on_status 回调通知) + +mindcli/health.py (修改) + ├── _tunnel_handle: TunnelHandle | None = None + ├── /tunnel/activate handler: + │ _tunnel_handle = await connect(url, jwt, + │ on_dispatch=self._dispatch_tool_call, + │ on_status=self._on_tunnel_status) ← 回调更新状态,不写全局变量 + └── def _on_tunnel_status(self, status, tools=0): + _tunnel_status = status ← health 自己的局部状态 +``` + +**核心变化**: + +```python +# ── 改造前(tunnel.py,单例 + 反向写 health 全局)── +_tunnel_client = TunnelClient() + +class TunnelClient: + async def _connect(self): + # ... + from mindcli.health import set_tunnel_status # ← 反向耦合 + set_tunnel_status("connected", len(tools)) + +# ── 改造后(pipelines/tunnel_session.py,回调注入)─ +async def connect( + url: str, + jwt: str, + on_dispatch: Callable, # 工具调用回调 + on_status: Callable, # 状态变更回调 +) -> TunnelHandle: + """建立 Tunnel 连接,状态变更通过 on_status 回调通知。 + + 不持有全局状态,不反向 import 调用方模块。 + """ + handle = TunnelHandle(url, jwt, on_dispatch, on_status) + await handle._start() + return handle +``` + +### 3.3 第三刀:managed_mcp.py → pipelines/tool_proxy.py + +**当前状态**(183 行):已接近无状态,白名单是实例级 `_approved: set[str]`,executors 是纯 subprocess/IO。但通过 `tunnel._managed_mcp` 间接成为进程级唯一实例。 + +**拆分方案**(改动最小): + +``` +mindcli/pipelines/tool_proxy.py (新增,~120 行) + ├── async def execute(tool_name, params, approved_set) -> dict + ├── _EXECUTORS: dict[str, Callable] ← 从 managed_mcp._register_executors 迁入 + └── def is_approved(tool_name, approved_set) -> bool + +mindcli/health.py 或 tunnel handle 持有 approved_set +``` + +executor 函数(`_exec_terminal` / `_exec_file_read` / `_exec_file_write` / `_exec_grep` / `_exec_file_ops` / `_exec_code`)本身就是纯 subprocess/IO,直接迁入即可。 + +### 3.4 第四刀:cli.py chat/ask 走 run_agent headless + +**当前问题**: + +```python +# cli.py:30-43(chat 命令) +@main.command() +def chat(model, skills, resume): + from cli import main as hermes_main # ← _vendor/cli.py 的 10000 行 TUI + hermes_main(**kwargs) # ← sys.exit() 杀进程 +``` + +`_vendor/cli.py` 的 `main()`: +- 写 `os.environ["HERMES_INTERACTIVE"] = "1"` +- 注册 `atexit` 钩子 +- 修改模块级 `_active_agent_ref` / `_active_worktree` / `_cleanup_done` +- 单查询模式 `sys.exit(0)` / `sys.exit(1)` + +**不可当作无状态函数调用。** + +**改造方案**: + +```python +# cli.py 改造后 +@main.command() +def chat(model, skills, resume): + """交互式 Chat — 走 Cloud Gateway(JWT 计费)。""" + from run_agent import AIAgent # ← headless,不拉 TUI + + agent = AIAgent( + model=model or None, + base_url=_get_cloud_gateway_url(), # 指向 Cloud Gateway + api_key=_get_jwt(), # JWT from tunnel handle + ) + agent.run_interactive(skills=skills, resume=resume) + +@main.command() +def ask(question, model, fmt): + """单次查询 — 走 Cloud Gateway(JWT 计费)。""" + from run_agent import AIAgent + + agent = AIAgent( + model=model or None, + base_url=_get_cloud_gateway_url(), + api_key=_get_jwt(), + ) + result = agent.run_query(question) + _output(result, fmt) +``` + +**vendor `cli.py` 的 TUI 只在"离线独立模式"(铁律 C 的断开即自治)时才拉起**,作为 `mind chat --offline` 的备选路径。 + +### 3.5 改造后文件结构 + +``` +mindcli/ +├── __init__.py # 22 行,不变 +├── __main__.py # 5 行,不变 +├── cli.py # ~250 行,chat/ask 改走 run_agent +├── health.py # ~350 行,持有 handles(状态枢纽) +├── service.py # 116 行,不变 +├── capability.py # 79 行,不变 +├── pipelines/ # ★ 新增目录 +│ ├── __init__.py +│ ├── audio_capture.py # ~200 行,从 recorder.py 抽出 +│ ├── tunnel_session.py # ~150 行,从 tunnel.py 抽出 +│ └── tool_proxy.py # ~120 行,从 managed_mcp.py 抽出 +└── _vendor/ # 不变 +``` + +**删除的文件**(逻辑迁入 pipelines/ 后): +- `recorder.py` → 逻辑迁入 `pipelines/audio_capture.py`,pyobjc delegate 类随之迁入 +- `tunnel.py` → 逻辑迁入 `pipelines/tunnel_session.py` +- `managed_mcp.py` → 逻辑迁入 `pipelines/tool_proxy.py` + +**薄壳净行数变化**: + +| 改造前 | 改造后 | +|:---|:---| +| 1633 行(7 文件) | ~1167 行(8 文件,含 pipelines/ 3 个新文件) | + +净减约 466 行(单例/工厂/全局变量的模板代码消除),同时新增"双工模式可用"和"循环耦合消除"两个能力。 + +--- + +## 四、改造后的状态边界 + +### 4.1 状态归属表 + +| 状态 | 改造前归属 | 改造后归属 | 改善 | +|:---|:---|:---|:---| +| 端口 8660 + event loop | `health._loop` (全局) | `health._loop` (不变) | — | +| Tunnel 连接 + JWT | `tunnel._tunnel_client` (单例) | `health._tunnel_handle` (调用方持有) | 消除循环耦合 | +| Tunnel 状态字符串 | `health._tunnel_status` (被 tunnel 反向写) | `health._tunnel_status` (on_status 回调写) | 单向数据流 | +| 录音句柄 | `recorder._recorder` (单例 + 互斥) | `health._active_captures[chatId]` (dict,可多实例) | **双工可用** | +| 工具白名单 | `tunnel._managed_mcp._approved` | `health._approved_set` 或 tunnel handle 持有 | 生命周期独立 | +| LLM 会话 | vendor `cli.py` 的 `HermesCLI.agent` + SQLite | `run_agent.AIAgent`(headless,走 Cloud Gateway) | **铁律 A 合规** | + +### 4.2 依赖图对比 + +**改造前**(有循环): + +``` +health.py ⇄ tunnel.py ← 经全局变量双向耦合 +health.py → recorder.py ← 经单例耦合 +tunnel.py → managed_mcp ← 经单例间接耦合 +cli.py → _vendor/cli.py ← 拉起重状态 TUI +``` + +**改造后**(单向,无环): + +``` +health.py → pipelines/tunnel_session.py (调用 connect,持 handle) +health.py → pipelines/audio_capture.py (调用 capture,持 handle) +health.py → pipelines/tool_proxy.py (调用 execute,持 approved_set) +pipelines/* → capability.py / executors (只组合原子,不反向 import health) +cli.py → run_agent.AIAgent (headless,走 Cloud Gateway) +``` + +### 4.3 铁律合规对照 + +| 铁律 | 改造前 | 改造后 | +|:---|:---:|:---:| +| A:决策权归云端 | 🔴 端有自己的脑(vendor TUI + 本地 session) | ✅ chat/ask 走 run_agent + Cloud Gateway JWT | +| B:能力报告义务 | ✅ | ✅(不变) | +| C:连接即受管/断开即自治 | 🟡 边界模糊 | ✅ tunnel handle + capture handle 生命周期由 health 管理,断开即释放 | + +--- + +## 五、实施路线图 + +### Phase 1: audio_capture 管线 + 双工解锁(2 天) + +``` +□ 创建 mindcli/pipelines/ 目录 + __init__.py +□ 从 recorder.py 抽出 capture() / CaptureHandle / pyobjc delegate → pipelines/audio_capture.py +□ 修改 health.py:/record/start 用 _active_captures[chatId] 替代 get_recorder() +□ 修改 health.py:/record/stop 从 _active_captures pop 后调 handle.stop() +□ 删除 recorder.py(逻辑已迁出) +□ 验证:mind record start --source system + mind record start --source mic 并行成功 +□ 验证:mind record stop 各自独立停止 +``` + +### Phase 2: tunnel_session 管线 + 解循环(1.5 天) + +``` +□ 从 tunnel.py 抽出 connect() / TunnelHandle → pipelines/tunnel_session.py +□ on_status 回调替代反向 import health.set_tunnel_status +□ 修改 health.py:/tunnel/activate 持有 _tunnel_handle +□ 删除 tunnel.py(逻辑已迁出) +□ 验证:mind tunnel connect → Cloud LLM 看到 local_* 工具 +□ 验证:断开后 _tunnel_handle = None,状态正确回退 +``` + +### Phase 3: tool_proxy 管线(0.5 天) + +``` +□ 从 managed_mcp.py 抽出 execute() / _EXECUTORS → pipelines/tool_proxy.py +□ 白名单 set 由调用方传入,不再经 tunnel 单例 +□ 删除 managed_mcp.py(逻辑已迁出) +□ 验证:工具调用正常,白名单过滤生效 +``` + +### Phase 4: chat/ask 走 run_agent(1 天) + +``` +□ cli.py chat/ask 改用 _vendor/run_agent.py 的 AIAgent headless 接口 +□ base_url 指向 Cloud Gateway,api_key 用 JWT +□ vendor cli.py TUI 保留为 mind chat --offline 备选 +□ 验证:mind ask "hello" → 走 Cloud Gateway → 积分扣费 +□ 验证:mind chat → 交互模式 → 走 Cloud Gateway +``` + +**总计 ~5 个工作日。** + +--- + +## 六、风险与回退 + +| 风险 | 等级 | 缓解 | +|:---|:---:|:---| +| pyobjc delegate 类迁移后权限引导断裂 | 🟡 | Phase 1 验证时首跑 `mind record start --source system`,确认 macOS 录屏权限弹窗正常 | +| run_agent headless 接口不支持 `--skills` | 🟡 | Phase 4 先验证 `AIAgent(model=, base_url=, api_key=)` 签名,必要时在 vendor 层薄封装 | +| Cloud Gateway JWT 链路未通 | 🟠 | Phase 4 可先保留 vendor TUI 路径作为 fallback,Gateway 通后再切 | +| 改造期间 _vendor 快照落后于 hermes 上游 | 🟢 | 改造不碰 _vendor,与 vendor 同步纪律正交 | + +**回退策略**:每个 Phase 独立,可按 Phase 回退。`recorder.py` / `tunnel.py` / `managed_mcp.py` 在 git 历史中保留,任一 Phase 失败可 revert 对应 commit。 + +--- + +## 七、决策记录 + +| # | 决策 | 理由 | +|:---|:---|:---| +| D1 | 选拆 recorder 为第一刀 | 双工模式是设计文档 §Phase4 的核心卖点,当前代码层面不可用,价值最高 | +| D2 | 用回调注入替代全局变量 | 消除 health ⇄ tunnel 循环耦合,单向数据流 | +| D3 | chat/ask 走 run_agent 而非 vendor TUI | 修正铁律 A(决策权归云端),vendor TUI 的 `sys.exit()` 和 7 个全局变量使其不可当函数调用 | +| D4 | 不把 CLI 变成 SSE 无状态原子 | CLI 是执行节点,依赖 Cloud Gateway/ASR,连接态必须存在 | +| D5 | 保留 vendor cli.py 作为 `--offline` 备选 | 铁律 C(断开即自治)要求离线模式可用 | +| D6 | pipelines/ 目录而非 infra/ | MindOS_CLI 是独立包,不依赖 hermes-overlay;pipelines/ 是自包含的无状态层 | + +--- + +## 附录:改造前后代码对照 + +### A.1 录音:单例 → 多实例 + +```python +# ═══ 改造前:recorder.py ════════════════════════════ +_recorder: "SystemRecorder | None" = None + +def get_recorder() -> "SystemRecorder": + global _recorder + if _recorder is None: + _recorder = SystemRecorder() + return _recorder + +class SystemRecorder: + async def start(self, source, ws_ep, chat_id, on_text): + if self._running: # ← 互斥 + return {"error": "录音已在进行中"} + # ... + +# ═══ 改造后:pipelines/audio_capture.py ════════════ +async def capture( + source: str, + ws_endpoint: str, + on_text: Callable[[str], None], +) -> CaptureHandle: + """启动一路采集,返回独立句柄。可多实例并存。""" + handle = CaptureHandle(source, ws_endpoint, on_text) + await handle._start() + return handle + +class CaptureHandle: + """一路音频采集的句柄。生命周期由调用方管理。""" + async def stop(self) -> dict: ... + def status(self) -> dict: ... +``` + +### A.2 Tunnel:全局变量 → 回调注入 + +```python +# ═══ 改造前:tunnel.py ══════════════════════════════ +_tunnel_client = TunnelClient() + +class TunnelClient: + async def _connect(self): + # ... + from mindcli.health import set_tunnel_status # ← 反向耦合 + set_tunnel_status("connected", len(tools)) + +# ═══ 改造后:pipelines/tunnel_session.py ═══════════ +async def connect( + url: str, + jwt: str, + on_dispatch: Callable, + on_status: Callable, # ← 状态变更通过回调通知 +) -> TunnelHandle: + handle = TunnelHandle(url, jwt, on_dispatch, on_status) + await handle._start() + return handle + +# health.py 调用方 +_tunnel_handle = await connect( + url, jwt, + on_dispatch=self._dispatch_tool_call, + on_status=lambda s, t=0: (_tunnel_status.__set_name__(...)) # 自己的状态自己写 +) +``` + +### A.3 chat/ask:vendor TUI → run_agent headless + +```python +# ═══ 改造前:cli.py ════════════════════════════════ +@main.command() +def chat(model, skills, resume): + from cli import main as hermes_main # ← _vendor/cli.py 10000 行 TUI + hermes_main(**kwargs) # ← sys.exit() 杀进程 + +# ═══ 改造后:cli.py ════════════════════════════════ +@main.command() +def chat(model, skills, resume): + from run_agent import AIAgent # ← headless + agent = AIAgent( + model=model or None, + base_url=_get_cloud_gateway_url(), + api_key=_get_jwt(), + ) + agent.run_interactive(skills=skills, resume=resume) +``` + +--- + +*最后更新:2026-07-01 v1.0* diff --git a/mindcli/__init__.py b/mindcli/__init__.py index 6a9ff46..55be9f2 100644 --- a/mindcli/__init__.py +++ b/mindcli/__init__.py @@ -11,7 +11,7 @@ POC 验证结论:sys.path.insert(0, _vendor_dir) 一行即可, import os import sys -__version__ = "0.1.0" +__version__ = "0.2.0" # ── Vendor 路径注入 ────────────────────────────────────────── # 将 _vendor/ 目录加入 sys.path 头部, diff --git a/mindcli/capability.py b/mindcli/capability.py index cd6af34..1d37a97 100644 --- a/mindcli/capability.py +++ b/mindcli/capability.py @@ -71,7 +71,7 @@ def _scan_mcp_servers() -> list[dict[str, str]]: def _get_vendor_commit() -> str: - commit_file = os.path.join(mindcli._VENDOR_DIR, "HERMES_COMMIT") + commit_file = os.path.join(mindcli._VENDOR_DIR, "VENDOR_COMMIT") try: with open(commit_file) as f: return f.read().strip() diff --git a/mindcli/cli.py b/mindcli/cli.py index 9be3065..f533f64 100644 --- a/mindcli/cli.py +++ b/mindcli/cli.py @@ -1,8 +1,10 @@ """ Mind CLI — 命令行入口。 -通过 Click 定义 `mind` 命令族,内部委托给 _vendor/ 中的 Hermes CLI。 -Phase 0 只实现 chat / ask / health 三个核心命令。 +通过 Click 定义 `mind` 命令族。 +- chat/ask 走 _vendor/run_agent.py 的 headless AIAgent(铁律 A:决策权归云端) +- LLM 调用 100% 走 Cloud Gateway(JWT 计费),不在本地拉起 vendor TUI +- --offline 备选:走 _vendor/cli.py 的完整 TUI(铁律 C:断开即自治) """ import click @@ -14,6 +16,44 @@ import sys import mindcli # noqa: F401 — 触发 __init__.py 的 sys.path 注入 +# ── Cloud Gateway 配置 ──────────────────────────────────── + +def _get_cloud_gateway_url() -> str: + """Cloud Gateway 的 LLM API base URL。""" + return os.environ.get( + "MINDOS_GATEWAY_URL", + "https://agent.brainwork.club/mindos-next/llm", + ) + + +def _get_jwt() -> str | None: + """获取 JWT(从 Tunnel 句柄或环境变量)。 + + 优先从 health server 的 tunnel handle 获取; + 离线/无 tunnel 时回退到 MINDOS_JWT 环境变量。 + """ + jwt = os.environ.get("MINDOS_JWT") + if jwt: + return jwt + + # 尝试从 health server 查询 tunnel 状态获取 JWT + #(JWT 存在 tunnel handle 内存中,不落盘) + import urllib.request + try: + req = urllib.request.Request("http://127.0.0.1:8660/tunnel/status") + with urllib.request.urlopen(req, timeout=2) as resp: + data = json.loads(resp.read()) + if data.get("status") == "connected": + # tunnel 已连接,JWT 在 handle 内存中 + # 通过环境变量 MINDOS_JWT 传递(由 /tunnel/activate 时设置) + return os.environ.get("MINDOS_JWT") + except Exception: + pass + return None + + +# ── Click 命令组 ────────────────────────────────────────── + @click.group(invoke_without_command=True) @click.version_option(version=mindcli.__version__, prog_name="mind") @click.pass_context @@ -27,11 +67,79 @@ def main(ctx): @click.option("--model", "-m", default="", help="模型名称(默认使用配置文件)") @click.option("--skills", "-s", multiple=True, help="加载指定 skill") @click.option("--resume", "-r", default="", help="恢复指定会话 ID") -def chat(model, skills, resume): - """进入交互式 Chat(复用 Hermes CLI 的完整 TUI)。""" +@click.option("--offline", is_flag=True, default=False, + help="离线模式:使用 vendor TUI + 本地 LLM 配置(铁律 C:断开即自治)") +def chat(model, skills, resume, offline): + """进入交互式 Chat。 + + 默认走 Cloud Gateway(JWT 计费)。 + --offline 走 _vendor/cli.py 的完整 TUI(本地 LLM 配置)。 + """ + if offline: + _chat_offline(model, skills, resume) + return + + _chat_cloud(model, skills, resume) + + +def _chat_cloud(model, skills, resume): + """走 Cloud Gateway 的交互式 chat(run_agent headless)。""" + jwt = _get_jwt() + if not jwt: + click.echo("❌ 未找到 JWT,请先连接 Tunnel:mind tunnel connect --token ") + click.echo(" 或设置环境变量: export MINDOS_JWT=") + sys.exit(1) + + from run_agent import AIAgent + + agent = AIAgent( + base_url=_get_cloud_gateway_url(), + api_key=jwt, + model=model or None, + enabled_toolsets=list(skills) if skills else None, + session_id=resume or None, + ) + + click.echo("🤖 Mind CLI (Cloud Gateway 模式)") + click.echo("=" * 50) + + # 交互循环 + conversation_history = [] + system_message = None + + while True: + try: + user_input = input("\n你: ").strip() + except (EOFError, KeyboardInterrupt): + click.echo("\n👋 再见!") + break + + if not user_input: + continue + if user_input.lower() in ("exit", "quit", "q"): + click.echo("👋 再见!") + break + + try: + result = agent.run_conversation( + user_message=user_input, + system_message=system_message, + conversation_history=conversation_history, + ) + # 更新会话历史 + if result.get("messages"): + conversation_history = result["messages"] + response = result.get("response", "") + if response: + click.echo(f"\n🤖 {response}") + except Exception as e: + click.echo(f"\n❌ 错误: {e}") + + +def _chat_offline(model, skills, resume): + """离线模式:走 vendor cli.py 的完整 TUI(本地 LLM 配置)。""" from cli import main as hermes_main - # 构建 Hermes CLI 参数 kwargs = {} if model: kwargs["model"] = model @@ -48,21 +156,42 @@ def chat(model, skills, resume): @click.option("--model", "-m", default="", help="模型名称") @click.option("--format", "fmt", default="text", help="输出格式: text/json/markdown") def ask(question, model, fmt): - """单次查询(= hermes -q)。""" - from cli import main as hermes_main + """单次查询(= hermes -q)。 - kwargs = {"query": question} - if model: - kwargs["model"] = model + 走 Cloud Gateway(JWT 计费)。 + """ + jwt = _get_jwt() + if not jwt: + click.echo("❌ 未找到 JWT,请先连接 Tunnel:mind tunnel connect --token ") + sys.exit(1) - hermes_main(**kwargs) + from run_agent import AIAgent + + agent = AIAgent( + base_url=_get_cloud_gateway_url(), + api_key=jwt, + model=model or None, + ) + + try: + result = agent.run_conversation(user_message=question) + response = result.get("response", "") + + if fmt == "json": + click.echo(json.dumps({"question": question, "answer": response}, + ensure_ascii=False, indent=2)) + elif fmt == "markdown": + click.echo(f"## Q: {question}\n\n{response}") + else: + click.echo(response) + except Exception as e: + click.echo(f"❌ 错误: {e}", err=True) + sys.exit(1) @main.command() def health(): """显示本地 CLI 健康状态。""" - import json - status = { "ok": True, "version": mindcli.__version__, @@ -160,6 +289,7 @@ def record_start(token, chat_id, source): """开始录音(独立音频源 → Cloud ASR)。 双工模式下,CLI 和浏览器各自独立推送,不做混音。 + 可多次调用不同 --source 启动多路录音(双工模式)。 """ import urllib.request body = json.dumps({"token": token, "chatId": chat_id, "source": source}).encode() @@ -182,19 +312,26 @@ def record_start(token, chat_id, source): @record.command(name="stop") -def record_stop(): +@click.option("--chat-id", default="", help="停止指定 chatId 的录音(默认停止全部)") +def record_stop(chat_id): """停止录音。""" import urllib.request + url = "http://127.0.0.1:8660/record/stop" + if chat_id: + url += f"?chatId={chat_id}" try: req = urllib.request.Request( - "http://127.0.0.1:8660/record/stop", + url, data=b"{}", headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read()) if result.get("ok"): - click.echo(f"⏹️ 录音已停止 duration={result.get('duration')}s") + if "stopped" in result: + click.echo(f"⏹️ 已停止 {result['stopped']} 路录音") + else: + click.echo(f"⏹️ 录音已停止 duration={result.get('duration')}s") else: click.echo(f"❌ {result.get('error')}") except Exception as e: @@ -210,7 +347,9 @@ def record_status(): with urllib.request.urlopen(req, timeout=2) as resp: data = json.loads(resp.read()) if data.get("running"): - click.echo(f"🎙️ 录音中 duration={data.get('duration')}s chatId={data.get('chatId')}") + for cap in data.get("captures", []): + click.echo(f"🎙️ 录音中 duration={cap.get('duration')}s " + f"chatId={cap.get('chatId')} source={cap.get('source')}") else: click.echo("⏹️ 未在录音") except Exception: @@ -225,36 +364,117 @@ def update(): click.echo(f"当前版本: v{mindcli.__version__}") click.echo("正在检查最新版本...") - remoteVersion = _check_remote_version() - if remoteVersion and remoteVersion != mindcli.__version__: - click.echo(f"⬆️ 发现新版本 v{remoteVersion}") - click.echo("正在升级...") - installCmd = "pip install --upgrade git+https://git.brainwork.club/lidf/MindOS_CLI.git" - result = subprocess.run(installCmd.split(), capture_output=True, text=True) - if result.returncode == 0: - click.echo(f"✅ 已升级到 v{remoteVersion}") - click.echo(" 请重启 Mind CLI(mind start)以生效") - else: - click.echo(f"❌ 升级失败: {result.stderr[:200]}") - elif remoteVersion: - click.echo(f"✅ 已是最新版本 v{mindcli.__version__}") - else: + remote = _fetch_remote_info() + if not remote: click.echo("⚠️ 无法检查远端版本(网络问题?)") + return + + remoteVersion = remote.get("version") + if not remoteVersion: + click.echo("⚠️ 远端版本信息异常") + return + + if remoteVersion == mindcli.__version__: + click.echo(f"✅ 已是最新版本 v{mindcli.__version__}") + return + + # 防降级:远端版本低于本地时,不升级(服务器 versions.json 可能未同步) + if _compare_versions(remoteVersion, mindcli.__version__) < 0: + click.echo(f"⚠️ 本地 v{mindcli.__version__} 已高于远端 v{remoteVersion}(服务器版本未同步?)") + return + + click.echo(f"⬆️ 发现新版本 v{remoteVersion}") + if remote.get("releaseNotes"): + click.echo(f" {remote['releaseNotes']}") + click.echo("正在升级...") + + # 优先用远端下发的 upgradeCmd;无则本地检测安装方式 + installCmd = remote.get("upgradeCmd") or _detect_upgrade_command() + click.echo(f" 执行: {installCmd}") + + result = subprocess.run( + installCmd, + shell=True, # 含引号/管道,必须 shell=True + capture_output=True, + text=True, + ) + if result.returncode == 0: + click.echo(f"✅ 已升级到 v{remoteVersion}") + click.echo(" 请重启 Mind CLI(mind start)以生效") + else: + click.echo(f"❌ 升级失败: {result.stderr[:300]}") + click.echo(" 可手动执行: pipx reinstall mindos-cli") -def _check_remote_version(): - """从 versions.json 获取远端 CLI 最新版本号。""" +def _compare_versions(v1: str, v2: str) -> int: + """比较两个语义化版本号。返回 -1 (v1v2)。""" + def _parse(v: str): + parts = [] + for p in v.strip().split("."): + try: + parts.append(int(p)) + except ValueError: + parts.append(0) + return parts + a, b = _parse(v1), _parse(v2) + # 补齐长度 + while len(a) < len(b): + a.append(0) + while len(b) < len(a): + b.append(0) + for x, y in zip(a, b): + if x < y: + return -1 + if x > y: + return 1 + return 0 + + +def _detect_upgrade_command() -> str: + """检测当前安装方式,返回正确的升级命令。 + + pipx 安装 → pipx reinstall(从 git URL 重新安装到隔离 venv) + pip 安装 → python -m pip install --upgrade(用当前解释器的 pip) + """ + import sys + + # 检测是否运行在 pipx 的 venv 中 + # pipx venv 路径特征:包含 .local/share/pipx/venvs/<包名> + python_path = sys.executable + if "pipx/venvs" in python_path or "pipx\\venvs" in python_path: + return "pipx reinstall mindos-cli" + + # 默认 pip 升级(用 sys.executable -m pip,避免 PATH 里找不到 pip) + return f"{sys.executable} -m pip install --upgrade git+https://git.brainwork.club/lidf/MindOS_CLI.git" + + +def _fetch_remote_info() -> dict | None: + """从 versions.json 获取远端 CLI 版本信息。 + + 返回 {"version": str, "upgradeCmd": str|None, "releaseNotes": str|None} 或 None。 + """ import urllib.request versionsUrl = "https://dl.brainwork.club/mindos-next/versions.json" try: req = urllib.request.Request(versionsUrl) with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) - return data.get("cli", {}).get("version") + cli = data.get("cli", {}) + return { + "version": cli.get("version"), + "upgradeCmd": cli.get("upgradeCmd"), + "releaseNotes": cli.get("releaseNotes"), + } except Exception: return None +def _check_remote_version(): + """从 versions.json 获取远端 CLI 最新版本号(兼容旧接口)。""" + info = _fetch_remote_info() + return info.get("version") if info else None + + def _get_vendor_commit() -> str: """读取 _vendor/VENDOR_COMMIT 文件获取 vendor 版本。""" commitFile = os.path.join(mindcli._VENDOR_DIR, "VENDOR_COMMIT") diff --git a/mindcli/health.py b/mindcli/health.py index 40b415a..679fbde 100644 --- a/mindcli/health.py +++ b/mindcli/health.py @@ -21,16 +21,15 @@ import mindcli logger = logging.getLogger("mindcli.health") -# ── 全局状态(Phase 2 tunnel.py 会写入) ────────────────── +# ── 运行时状态(由调用方管线通过回调更新) ────────────── _tunnel_status = "disconnected" _tool_count = 0 +# ── 活跃录音句柄(按 chatId 索引,可多实例并存 = 双工模式)── +_active_captures: dict[str, "object"] = {} # chatId → CaptureHandle -def set_tunnel_status(status: str, tools: int = 0) -> None: - """由 tunnel.py 调用,更新隧道状态。""" - global _tunnel_status, _tool_count - _tunnel_status = status - _tool_count = tools +# ── Tunnel 句柄(由 /tunnel/activate 创建,单连接)──────── +_tunnel_handle = None # TunnelHandle | None # asyncio 事件循环(tunnel 需要) @@ -47,6 +46,17 @@ def _detect_capabilities() -> list[str]: return caps +def _on_tunnel_status(status: str, tools: int = 0) -> None: + """Tunnel 状态变更回调(由 TunnelHandle 通过 on_status 调用)。 + + 替代旧版 tunnel.py 反向 import health.set_tunnel_status 的全局变量写入, + 实现单向数据流:Tunnel → 回调 → health 局部状态。 + """ + global _tunnel_status, _tool_count + _tunnel_status = status + _tool_count = tools + + class _HealthHandler(BaseHTTPRequestHandler): """处理 /health 和 /tunnel/activate 请求。""" @@ -64,18 +74,21 @@ class _HealthHandler(BaseHTTPRequestHandler): }, ensure_ascii=False) self._respond(200, body) elif self.path == "/tunnel/status": - from mindcli.tunnel import get_tunnel_client - client = get_tunnel_client() body = json.dumps({ - "status": client.status, - "userId": client.user_id, + "status": _tunnel_handle.status if _tunnel_handle else "disconnected", + "userId": _tunnel_handle.user_id if _tunnel_handle else None, "tools": _tool_count, }) self._respond(200, body) elif self.path == "/record/status": try: - from mindcli.recorder import get_recorder - body = json.dumps(get_recorder().status(), ensure_ascii=False) + # 遍历所有活跃录音句柄(双工模式下可有多路) + if _active_captures: + captures = [h.status() for h in _active_captures.values()] + else: + captures = [] + body = json.dumps({"running": len(captures) > 0, "captures": captures}, + ensure_ascii=False) self._respond(200, body) except Exception as e: self._respond(500, json.dumps({"error": str(e)})) @@ -84,11 +97,13 @@ class _HealthHandler(BaseHTTPRequestHandler): def do_POST(self): """处理 POST 请求。""" - if self.path == "/tunnel/activate": + # 去掉 query string 后再匹配路由(/record/stop?chatId=xxx → /record/stop) + path = self.path.split("?")[0] + if path == "/tunnel/activate": self._handle_tunnel_activate() - elif self.path == "/record/start": + elif path == "/record/start": self._handle_record_start() - elif self.path == "/record/stop": + elif path == "/record/stop": self._handle_record_stop() else: self._respond(404, json.dumps({"error": "Not Found"})) @@ -107,14 +122,24 @@ class _HealthHandler(BaseHTTPRequestHandler): self._respond(400, json.dumps({"error": "Missing token or tunnelUrl"})) return - # 在 asyncio 事件循环中启动 tunnel - from mindcli.tunnel import get_tunnel_client - client = get_tunnel_client() + # 在 asyncio 事件循环中启动 tunnel(无状态管线 + 回调) + from mindcli.pipelines.tunnel_session import connect as tunnel_connect if _loop and _loop.is_running(): - future = asyncio.run_coroutine_threadsafe( - client.activate(token, tunnel_url), _loop - ) + # 若已有旧 handle,先断开 + global _tunnel_handle + async def _activate(): + global _tunnel_handle + if _tunnel_handle: + await _tunnel_handle.disconnect() + _tunnel_handle = await tunnel_connect( + url=tunnel_url, + jwt=token, + on_status=_on_tunnel_status, + ) + return {"ok": True, "status": "connecting"} + + future = asyncio.run_coroutine_threadsafe(_activate(), _loop) result = future.result(timeout=5) else: result = {"ok": True, "status": "no_event_loop"} @@ -146,19 +171,21 @@ class _HealthHandler(BaseHTTPRequestHandler): f"?token={token}&chatId={chat_id}&meetingId={meeting_id}&source=system" ) - from mindcli.recorder import get_recorder - recorder = get_recorder() source = data.get("source", "system") # "system" 或 "mic" if _loop and _loop.is_running(): + from mindcli.pipelines.audio_capture import capture as capture_audio future = asyncio.run_coroutine_threadsafe( - recorder.start( + capture_audio( ws_url=ws_base, chat_id=chat_id, meeting_id=meeting_id, source=source, ), _loop, ) - result = future.result(timeout=10) + handle = future.result(timeout=10) + # 存入 _active_captures,按 chatId 索引(双工模式可多路并存) + _active_captures[chat_id or meeting_id] = handle + result = {"ok": True, "meetingId": handle.meeting_id, "source": source} else: result = {"error": "事件循环未运行,请先 mind start"} @@ -170,12 +197,32 @@ class _HealthHandler(BaseHTTPRequestHandler): def _handle_record_stop(self): """停止本地录音。""" try: - from mindcli.recorder import get_recorder - recorder = get_recorder() + # 支持 /record/stop?chatId=xxx 停止单路;无参则停止所有 + from urllib.parse import urlparse, parse_qs + parsed = urlparse(self.path) + qs = parse_qs(parsed.query) + target_chat = qs.get("chatId", [None])[0] if _loop and _loop.is_running(): - future = asyncio.run_coroutine_threadsafe(recorder.stop(), _loop) - result = future.result(timeout=10) + if target_chat: + # 停止指定 chatId 的单路录音 + handle = _active_captures.pop(target_chat, None) + if handle: + future = asyncio.run_coroutine_threadsafe(handle.stop(), _loop) + result = future.result(timeout=10) + else: + result = {"error": f"无活跃录音 chatId={target_chat}"} + elif _active_captures: + # 停止所有活跃录音(全局 [停止录音]) + results = [] + for cid in list(_active_captures.keys()): + handle = _active_captures.pop(cid, None) + if handle: + future = asyncio.run_coroutine_threadsafe(handle.stop(), _loop) + results.append(future.result(timeout=10)) + result = {"ok": True, "stopped": len(results), "results": results} + else: + result = {"error": "未在录音"} else: result = {"error": "事件循环未运行"} @@ -214,7 +261,7 @@ class _ThreadedHTTPServer(ThreadingMixIn, HTTPServer): def _get_vendor_commit() -> str: - commit_file = os.path.join(mindcli._VENDOR_DIR, "HERMES_COMMIT") + commit_file = os.path.join(mindcli._VENDOR_DIR, "VENDOR_COMMIT") try: with open(commit_file) as f: return f.read().strip() diff --git a/mindcli/pipelines/__init__.py b/mindcli/pipelines/__init__.py new file mode 100644 index 0000000..8e6573c --- /dev/null +++ b/mindcli/pipelines/__init__.py @@ -0,0 +1,10 @@ +""" +Mind CLI 无状态管线层。 + +参照 hermes-overlay/infra/pipelines/anyfile2md.py 的分层模式: + - 管线层只做编排,不做逻辑。无状态:不写 DB、不推 SSE、不管 session。 + - 状态化职责(生命周期管理、端口绑定、状态聚合)下沉到调用方(health.py)。 + +每个模块的入口是工厂函数(capture / connect / execute),返回轻量句柄, +调用方持有句柄并管理其生命周期。 +""" diff --git a/mindcli/recorder.py b/mindcli/pipelines/audio_capture.py similarity index 75% rename from mindcli/recorder.py rename to mindcli/pipelines/audio_capture.py index 8f8a054..1e41185 100644 --- a/mindcli/recorder.py +++ b/mindcli/pipelines/audio_capture.py @@ -1,5 +1,5 @@ """ -Mind CLI — 系统拾音引擎 (Phase 4) +Mind CLI — 系统拾音管线(无状态)。 双层架构·音频服务层: CLI 作为音频服务层的一个独立源,只负责采集系统音频。 @@ -11,6 +11,10 @@ Mind CLI — 系统拾音引擎 (Phase 4) 不做混音。混音 = 伪需求。"关联两份转写"是 Agent 层的智力工作。 +无状态:不持有进程级单例、不反向 import 调用方模块。 +每次 capture() 返回独立 CaptureHandle,可多实例并存(双工模式)。 +状态化职责(哪些 handle 在跑、何时 stop)由调用方(health.py)管理。 + Cloud 端复用 dashscope_realtime.py 管线,零新增代码。 协议与前端 RecordingService 完全一致: 客户端→服务端: binary PCM16 帧 / {"type":"stop"} @@ -24,58 +28,104 @@ import threading import time from typing import Callable, Literal -logger = logging.getLogger("mindcli.recorder") +logger = logging.getLogger("mindcli.pipelines.audio_capture") # ── 常量 ────────────────────────────────────────────────── TARGET_SAMPLE_RATE = 16000 # Cloud ASR 要求 16kHz CHUNK_DURATION_MS = 100 # 每帧 100ms CHUNK_SAMPLES = TARGET_SAMPLE_RATE * CHUNK_DURATION_MS // 1000 # 1600 -# ── 全局单例 ────────────────────────────────────────────── -_recorder: "SystemRecorder | None" = None - -def get_recorder() -> "SystemRecorder": - """获取全局 SystemRecorder 单例。""" - global _recorder - if _recorder is None: - _recorder = SystemRecorder() - return _recorder - - -class SystemRecorder: +async def capture( + ws_url: str, + chat_id: str = "", + meeting_id: str = "", + source: Literal["system", "mic"] = "system", + on_text: Callable[[str, str], None] | None = None, +) -> "CaptureHandle": """ - 系统拾音引擎 — 音频服务层的一个独立源。 + 启动一路音频采集,返回独立句柄。 - 只负责单一音频源的采集 + WS 推送。 - 不做混音(双工模式下浏览器和 CLI 各自独立推送到 Cloud ASR)。 + 无单例、无互斥。可多次调用,每次返回独立 handle, + 双工模式 = 两个 handle 并存(system + mic 各一路)。 + + Args: + ws_url: Cloud ASR WebSocket URL(含 token/chatId/meetingId query) + chat_id: 对话 ID + meeting_id: 录音批次 ID + source: "system"(ScreenCaptureKit)或 "mic"(sounddevice) + on_text: 收到转写文本的回调 (type, text) + + Returns: + CaptureHandle(已启动采集) + + Raises: + RuntimeError: WebSocket 连接失败或音频源启动失败 + """ + handle = CaptureHandle( + ws_url=ws_url, + chat_id=chat_id, + meeting_id=meeting_id or f"cli_rec_{int(time.time() * 1000)}", + source=source, + on_text=on_text, + ) + await handle._start() + return handle + + +class CaptureHandle: + """ + 一路音频采集的句柄。生命周期由调用方管理。 + + 非单例——双工模式下可同时存在多个 CaptureHandle 实例, + 各自持有独立的 WS 连接、音频缓冲区、采集资源。 """ - def __init__(self): + def __init__( + self, + ws_url: str, + chat_id: str, + meeting_id: str, + source: str, + on_text: Callable[[str, str], None] | None, + ): + self._ws_url = ws_url + self._chat_id = chat_id + self._meeting_id = meeting_id + self._source = source + self._on_text = on_text + self._running = False self._ws = None - self._source: str = "system" self._start_time = 0.0 - self._chat_id = "" - self._meeting_id = "" + # 音频缓冲区(线程安全) self._audio_buf: bytearray = bytearray() self._buf_lock = threading.Lock() + # 采集资源 self._mic_stream = None # sounddevice.InputStream self._sc_stream = None # SCStream self._sc_delegate = None + # 推送线程 self._push_thread: threading.Thread | None = None + # asyncio 事件循环引用(start 时保存) self._loop: asyncio.AbstractEventLoop | None = None - # 事件回调 - self._on_text: Callable[[str, str], None] | None = None @property def is_running(self) -> bool: return self._running + @property + def chat_id(self) -> str: + return self._chat_id + + @property + def meeting_id(self) -> str: + return self._meeting_id + def status(self) -> dict: """返回当前录音状态。""" return { @@ -86,57 +136,31 @@ class SystemRecorder: "meetingId": self._meeting_id, } - async def start( - self, - ws_url: str, - chat_id: str = "", - meeting_id: str = "", - source: Literal["system", "mic"] = "system", - on_text: Callable[[str, str], None] | None = None, - ) -> dict: - """ - 开始录音(单一源)。 - - Args: - ws_url: Cloud ASR WebSocket URL(含 token/chatId/meetingId query) - chat_id: 对话 ID - meeting_id: 录音批次 ID - source: "system"(ScreenCaptureKit)或 "mic"(sounddevice) - on_text: 收到转写文本的回调 (type, text) - - Returns: - {"ok": True, "meetingId": "..."} 或 {"error": "..."} - """ - if self._running: - return {"error": "录音已在进行中"} - - self._chat_id = chat_id - self._meeting_id = meeting_id or f"cli_rec_{int(time.time() * 1000)}" - self._source = source - self._on_text = on_text + async def _start(self) -> dict: + """启动采集(由 capture() 调用)。""" self._audio_buf.clear() self._loop = asyncio.get_running_loop() # 1. 连接 Cloud ASR WebSocket try: import websockets - self._ws = await websockets.connect(ws_url) - logger.info("[Recorder] WS 已连接: %s", ws_url[:80]) + self._ws = await websockets.connect(self._ws_url) + logger.info("[AudioCapture] WS 已连接: %s", self._ws_url[:80]) except Exception as e: - logger.error("[Recorder] WS 连接失败: %s", e) - return {"error": f"WebSocket 连接失败: {e}"} + logger.error("[AudioCapture] WS 连接失败: %s", e) + raise RuntimeError(f"WebSocket 连接失败: {e}") # 2. 启动音频采集 try: - if source == "system": + if self._source == "system": self._start_system_audio() else: self._start_mic() except Exception as e: - logger.error("[Recorder] 音频源 '%s' 启动失败: %s", source, e) + logger.error("[AudioCapture] 音频源 '%s' 启动失败: %s", self._source, e) await self._ws.close() self._ws = None - return {"error": f"音频源启动失败: {e}"} + raise RuntimeError(f"音频源启动失败: {e}") # 3. 启动推送线程 self._running = True @@ -147,9 +171,9 @@ class SystemRecorder: # 4. 启动 WS 接收协程(转写结果) asyncio.create_task(self._ws_recv_loop()) - logger.info("[Recorder] 录音已开始 source=%s chatId=%s meetingId=%s", - source, chat_id, self._meeting_id) - return {"ok": True, "meetingId": self._meeting_id, "source": source} + logger.info("[AudioCapture] 录音已开始 source=%s chatId=%s meetingId=%s", + self._source, self._chat_id, self._meeting_id) + return {"ok": True, "meetingId": self._meeting_id, "source": self._source} async def stop(self) -> dict: """停止录音。""" @@ -176,7 +200,7 @@ class SystemRecorder: pass self._ws = None - logger.info("[Recorder] 录音已停止 duration=%.1fs", duration) + logger.info("[AudioCapture] 录音已停止 duration=%.1fs", duration) return {"ok": True, "duration": duration, "meetingId": self._meeting_id} # ── 麦克风采集(sounddevice)──────────────────────────── @@ -187,7 +211,7 @@ class SystemRecorder: def _callback(indata, frames, time_info, status): if status: - logger.debug("[Recorder] mic status: %s", status) + logger.debug("[AudioCapture] mic status: %s", status) with self._buf_lock: self._audio_buf.extend(indata.tobytes()) @@ -199,7 +223,7 @@ class SystemRecorder: callback=_callback, ) self._mic_stream.start() - logger.info("[Recorder] 麦克风已启动 @%dHz", TARGET_SAMPLE_RATE) + logger.info("[AudioCapture] 麦克风已启动 @%dHz", TARGET_SAMPLE_RATE) # ── 系统音频采集(ScreenCaptureKit)───────────────────── @@ -244,7 +268,7 @@ class SystemRecorder: # 创建 delegate _ensure_delegate_class() self._sc_delegate = _SCStreamDelegate.alloc().init() - self._sc_delegate._recorder = self + self._sc_delegate._handle = self # ← delegate 持有 handle 引用 # 创建 stream self._sc_stream = SCStream.alloc().initWithFilter_configuration_delegate_( @@ -272,7 +296,7 @@ class SystemRecorder: if error_holder[0]: raise RuntimeError(f"SCStream 启动失败: {error_holder[0]}") - logger.info("[Recorder] 系统音频已启动 (ScreenCaptureKit @%dHz)", TARGET_SAMPLE_RATE) + logger.info("[AudioCapture] 系统音频已启动 (ScreenCaptureKit @%dHz)", TARGET_SAMPLE_RATE) def _on_system_audio(self, raw_bytes: bytes): """系统音频回调。SCStream 输出 float32 PCM,需要转为 int16。""" @@ -340,13 +364,13 @@ class SystemRecorder: asyncio.run_coroutine_threadsafe(ws.send(chunk), self._loop) send_count += 1 except Exception as e: - logger.debug("[Recorder] WS send err: %s", e) + logger.debug("[AudioCapture] WS send err: %s", e) break offset += frame_bytes # 每 ~5s 打一次发送统计 if send_count > 0 and send_count % 50 == 0: - logger.info("[Recorder] WS sent %d frames (%.1fs)", + logger.info("[AudioCapture] WS sent %d frames (%.1fs)", send_count, send_count * CHUNK_DURATION_MS / 1000) async def _ws_recv_loop(self): @@ -358,16 +382,16 @@ class SystemRecorder: msg_type = data.get("type", "") text = data.get("text", "") if msg_type in ("partial", "final") and text: - logger.info("[Recorder] %s: %s", msg_type, text[:50]) + logger.info("[AudioCapture] %s: %s", msg_type, text[:50]) if self._on_text: self._on_text(msg_type, text) elif msg_type == "error": - logger.error("[Recorder] ASR error: %s", data.get("message")) + logger.error("[AudioCapture] ASR error: %s", data.get("message")) except (json.JSONDecodeError, TypeError): pass except Exception as e: if self._running: - logger.warning("[Recorder] WS recv 断开: %s", e) + logger.warning("[AudioCapture] WS recv 断开: %s", e) # ── 工具函数 ────────────────────────────────────────────── @@ -381,7 +405,7 @@ def _sync_get_sharable_content(): def _handler(content, error): if error: - logger.error("[Recorder] SCShareableContent error: %s", error) + logger.error("[AudioCapture] SCShareableContent error: %s", error) result[0] = content event.set() @@ -406,12 +430,12 @@ def _define_sc_delegate(): class _Delegate(NSObject): """接收 SCStream 音频样本的 delegate。""" - _recorder = None + _handle = None # ← CaptureHandle 引用(替代旧 _recorder) def stream_didOutputSampleBuffer_ofType_(self, stream, sample_buffer, output_type): if output_type != SCStreamOutputTypeAudio: return - if not self._recorder or not self._recorder.is_running: + if not self._handle or not self._handle.is_running: return try: @@ -422,9 +446,9 @@ def _define_sc_delegate(): # CMBlockBufferCopyDataBytes 返回 (OSStatus, bytes_data) status, raw_data = CoreMedia.CMBlockBufferCopyDataBytes(block_buf, 0, length, None) if status == 0 and raw_data: - self._recorder._on_system_audio(raw_data) + self._handle._on_system_audio(raw_data) except Exception as e: - logger.warning("[Recorder] SCStream sample error: %s", e, exc_info=True) + logger.warning("[AudioCapture] SCStream sample error: %s", e, exc_info=True) return _Delegate diff --git a/mindcli/managed_mcp.py b/mindcli/pipelines/tool_proxy.py similarity index 82% rename from mindcli/managed_mcp.py rename to mindcli/pipelines/tool_proxy.py index 415f9e5..9980e81 100644 --- a/mindcli/managed_mcp.py +++ b/mindcli/pipelines/tool_proxy.py @@ -1,8 +1,11 @@ """ -Mind CLI — Managed MCP 治理层。 +Mind CLI — 工具代理管线(无状态)。 在 _vendor/tools/ 之上加白名单过滤。Cloud 审批通过的工具才能执行。 Managed 模式下:只允许 approved_tools 列表中的工具。 + +无状态:不持有进程级单例、不反向 import 调用方模块。 +白名单 set 由调用方传入,生命周期由调用方管理(通常是 TunnelHandle 持有)。 """ import asyncio @@ -10,23 +13,26 @@ import logging import os import subprocess import sys -from typing import Any +import shlex +from typing import Any, Callable -logger = logging.getLogger("mindcli.managed_mcp") +logger = logging.getLogger("mindcli.pipelines.tool_proxy") -class ManagedMCP: +class ToolProxy: """ 治理层:只暴露 Cloud 审批通过的工具。 Cloud 通过 Tunnel 握手下发 approved_tools 白名单, - 后续 tool_call 请求先过白名单检查,再委托到 _vendor/tools/ 执行。 + 后续 tool_call 请求先过白名单检查,再委托到内置执行器执行。 + + 非单例——可被任意调用方构造和持有。 """ def __init__(self, approved_tools: list[str] | None = None): self._approved: set[str] = set(approved_tools or []) # 工具名 → 执行函数的映射 - self._executors: dict[str, Any] = {} + self._executors: dict[str, Callable] = {} self._register_executors() def update_approved(self, tools: list[str]) -> None: @@ -36,9 +42,9 @@ class ManagedMCP: added = self._approved - old removed = old - self._approved if added: - logger.info("[ManagedMCP] 新增审批工具: %s", added) + logger.info("[ToolProxy] 新增审批工具: %s", added) if removed: - logger.info("[ManagedMCP] 移除审批工具: %s", removed) + logger.info("[ToolProxy] 移除审批工具: %s", removed) def is_approved(self, tool_name: str) -> bool: """检查工具是否在白名单中。""" @@ -56,7 +62,7 @@ class ManagedMCP: {"output": "...", "exit_code": 0} 或 {"error": "..."} """ if not self.is_approved(tool_name): - logger.warning("[ManagedMCP] 工具 '%s' 未审批,拒绝执行", tool_name) + logger.warning("[ToolProxy] 工具 '%s' 未审批,拒绝执行", tool_name) return {"error": f"Tool '{tool_name}' not approved by Cloud"} executor = self._executors.get(tool_name) @@ -67,7 +73,7 @@ class ManagedMCP: result = await executor(params) return result except Exception as e: - logger.error("[ManagedMCP] 工具 '%s' 执行异常: %s", tool_name, e) + logger.error("[ToolProxy] 工具 '%s' 执行异常: %s", tool_name, e) return {"error": f"Execution failed: {str(e)}"} def _register_executors(self) -> None: @@ -141,7 +147,7 @@ class ManagedMCP: if not pattern: return {"error": "No pattern specified"} - cmd = f"grep -rn {_shell_quote(pattern)} {_shell_quote(path)}" + cmd = f"grep -rn {shlex.quote(pattern)} {shlex.quote(path)}" return await self._exec_terminal({"command": cmd, "timeout": 15}) async def _exec_file_ops(self, params: dict) -> dict: @@ -151,13 +157,13 @@ class ManagedMCP: dst = params.get("destination", "") if op == "copy": - cmd = f"cp -r {_shell_quote(src)} {_shell_quote(dst)}" + cmd = f"cp -r {shlex.quote(src)} {shlex.quote(dst)}" elif op == "move": - cmd = f"mv {_shell_quote(src)} {_shell_quote(dst)}" + cmd = f"mv {shlex.quote(src)} {shlex.quote(dst)}" elif op == "delete": - cmd = f"rm -rf {_shell_quote(src)}" + cmd = f"rm -rf {shlex.quote(src)}" elif op == "list": - cmd = f"ls -la {_shell_quote(src)}" + cmd = f"ls -la {shlex.quote(src)}" else: return {"error": f"Unknown operation: {op}"} @@ -172,12 +178,6 @@ class ManagedMCP: return {"error": f"Unsupported language: {language}"} return await self._exec_terminal({ - "command": f"{sys.executable} -c {_shell_quote(code)}", + "command": f"{sys.executable} -c {shlex.quote(code)}", "timeout": 30, }) - - -def _shell_quote(s: str) -> str: - """简单的 shell 参数转义。""" - import shlex - return shlex.quote(s) diff --git a/mindcli/tunnel.py b/mindcli/pipelines/tunnel_session.py similarity index 65% rename from mindcli/tunnel.py rename to mindcli/pipelines/tunnel_session.py index 6142a4c..e81db28 100644 --- a/mindcli/tunnel.py +++ b/mindcli/pipelines/tunnel_session.py @@ -1,18 +1,20 @@ """ -Mind CLI — WebSocket Tunnel 客户端。 +Mind CLI — WebSocket Tunnel 会话管线(无状态)。 连接到 Cloud 端 mindcli_bridge,接收工具调用指令并在本地执行。 采用 Browser-Donated JWT 认证:浏览器授权 CLI,CLI 不需独立认证。 + +无状态:不持有进程级单例、不反向 import 调用方模块(health.py)。 +状态变更通过 on_status 回调通知调用方,工具调用通过 on_dispatch 回调派发。 +生命周期由调用方(health.py)管理。 """ import asyncio import json import logging -import os -import time -from typing import Any +from typing import Callable -logger = logging.getLogger("mindcli.tunnel") +logger = logging.getLogger("mindcli.pipelines.tunnel_session") # 连接状态 DISCONNECTED = "disconnected" @@ -20,24 +22,60 @@ CONNECTING = "connecting" CONNECTED = "connected" -class TunnelClient: +async def connect( + url: str, + jwt: str, + on_dispatch: Callable[[dict], None] | None = None, + on_status: Callable[[str, int], None] | None = None, +) -> "TunnelHandle": """ - CLI → Cloud WebSocket 隧道。 + 建立 Tunnel 连接,返回句柄。 + + Args: + url: Cloud Tunnel WebSocket URL + jwt: MindPass JWT(浏览器提供) + on_dispatch: 工具调用派发回调(可选,默认走内部 ToolProxy) + on_status: 状态变更回调 (status_str, tool_count) + + Returns: + TunnelHandle(已在后台启动连接循环) + """ + handle = TunnelHandle(url, jwt, on_dispatch, on_status) + handle._start_connect_loop() + return handle + + +class TunnelHandle: + """ + CLI → Cloud WebSocket 隧道句柄。 + + 非单例——生命周期由调用方管理。 生命周期: - 1. 浏览器 POST /tunnel/activate → 提供 JWT + tunnelUrl - 2. TunnelClient.connect() → WebSocket 握手 + 能力协商 - 3. 消息循环:接收 tool_call → ManagedMCP 执行 → 返回结果 + 1. 调用方调 connect() → 后台启动 _connect_loop + 2. 握手 + 能力协商 → Cloud 下发 approved_tools → 创建 ToolProxy + 3. 消息循环:接收 tool_call → ToolProxy 执行 → 返回结果 4. 心跳维持 30s / 断线指数退避重连 """ - def __init__(self): + def __init__( + self, + url: str, + jwt: str, + on_dispatch: Callable[[dict], None] | None = None, + on_status: Callable[[str, int], None] | None = None, + ): + self._tunnel_url = url + self._jwt = jwt + self._on_dispatch = on_dispatch + self._on_status = on_status + self._status = DISCONNECTED self._ws = None - self._jwt: str | None = None - self._tunnel_url: str | None = None self._user_id: str | None = None - self._managed_mcp = None + self._tool_proxy = None # ToolProxy 实例,握手成功后创建 + + # 后台任务 self._reconnect_task: asyncio.Task | None = None self._heartbeat_task: asyncio.Task | None = None self._message_task: asyncio.Task | None = None @@ -47,9 +85,6 @@ class TunnelClient: self._max_reconnect_delay = 30.0 self._reconnect_attempts = 0 - # 回调:通知 health.py 更新状态 - self._on_status_change = None - @property def status(self) -> str: return self._status @@ -58,21 +93,21 @@ class TunnelClient: def user_id(self) -> str | None: return self._user_id - def set_status_callback(self, callback) -> None: - """设置状态变更回调(由 health.py 注册)。""" - self._on_status_change = callback - - def _set_status(self, status: str) -> None: + def _set_status(self, status: str, tools: int = 0) -> None: self._status = status - if self._on_status_change: - self._on_status_change(status) + if self._on_status: + self._on_status(status, tools) + + def _start_connect_loop(self) -> None: + """在当前 event loop 中后台启动连接循环。""" + self._reconnect_task = asyncio.create_task(self._connect_loop()) async def activate(self, jwt: str, tunnel_url: str) -> dict: """ - 浏览器授权激活 Tunnel。 + 更新 JWT + URL 并重新连接(由调用方在浏览器重新授权时调用)。 Args: - jwt: MindPass JWT(浏览器提供) + jwt: 新的 MindPass JWT tunnel_url: Cloud Tunnel WebSocket URL Returns: @@ -84,9 +119,8 @@ class TunnelClient: # 取消旧连接 await self.disconnect() - # 后台启动连接 + # 后台启动新连接 self._reconnect_task = asyncio.create_task(self._connect_loop()) - return {"ok": True, "status": "connecting"} async def disconnect(self) -> None: @@ -112,6 +146,11 @@ class TunnelClient: self._reconnect_delay = 1.0 logger.info("[Tunnel] 已断开") + def update_approved(self, tools: list[str]) -> None: + """热更新工具白名单(Cloud 下发时调用)。""" + if self._tool_proxy: + self._tool_proxy.update_approved(tools) + async def _connect_loop(self) -> None: """连接循环:握手 → 消息循环 → 断线重连。""" while True: @@ -173,17 +212,16 @@ class TunnelClient: # 接收审批结果 raw = await asyncio.wait_for(self._ws.recv(), timeout=10) approval = json.loads(raw) + approved: list[str] = [] if approval.get("type") == "approved_tools": approved = approval.get("tools", []) - # 初始化 ManagedMCP - from mindcli.managed_mcp import ManagedMCP - self._managed_mcp = ManagedMCP(approved_tools=approved) + # 初始化 ToolProxy(从 pipelines.tool_proxy 导入,无状态) + from mindcli.pipelines.tool_proxy import ToolProxy + self._tool_proxy = ToolProxy(approved_tools=approved) logger.info("[Tunnel] 审批通过工具: %s", approved) - # 更新 health 状态 - from mindcli.health import set_tunnel_status - set_tunnel_status("connected", len(approved) if approval.get("type") == "approved_tools" else 0) - self._set_status(CONNECTED) + # ★ 通过回调通知调用方状态(不反向 import health.py) + self._set_status(CONNECTED, len(approved)) async def _run(self) -> None: """主消息循环:接收 Cloud 指令 → 本地执行 → 返回结果。""" @@ -195,8 +233,8 @@ class TunnelClient: await self._handle_tool_call(msg) elif msg.get("type") == "approved_tools": # 热更新白名单 - if self._managed_mcp: - self._managed_mcp.update_approved(msg.get("tools", [])) + if self._tool_proxy: + self._tool_proxy.update_approved(msg.get("tools", [])) elif msg.get("type") == "ping": await self._ws.send(json.dumps({"type": "pong"})) else: @@ -214,10 +252,13 @@ class TunnelClient: logger.info("[Tunnel] 工具调用: %s (id=%s)", tool_name, call_id) - if not self._managed_mcp: - result = {"error": "ManagedMCP not initialized"} + if self._on_dispatch: + # 调用方自定义派发 + result = self._on_dispatch(msg) + elif not self._tool_proxy: + result = {"error": "ToolProxy not initialized"} else: - result = await self._managed_mcp.execute(tool_name, tool_args) + result = await self._tool_proxy.execute(tool_name, tool_args) # 截断过大的输出(防止 WS 阻塞) output = result.get("output", "") @@ -231,12 +272,3 @@ class TunnelClient: "result": result, } await self._ws.send(json.dumps(response)) - - -# ── 全局单例 ────────────────────────────────────────── -_tunnel_client = TunnelClient() - - -def get_tunnel_client() -> TunnelClient: - """获取全局 TunnelClient 实例。""" - return _tunnel_client diff --git a/pyproject.toml b/pyproject.toml index d380532..956da1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mindos-cli" -version = "0.1.0" +version = "0.2.0" description = "MindOS NEXT 本地执行体 — Cloud Hermes 的受管理执行节点" readme = "README.md" requires-python = ">=3.10" diff --git a/versions.json b/versions.json new file mode 100644 index 0000000..8cfb222 --- /dev/null +++ b/versions.json @@ -0,0 +1,27 @@ +{ + "apk": { + "version": "3.2.0", + "downloadUrl": "https://dl.brainwork.club/mindos-next/MindOS-NEXT-latest.apk", + "releaseDate": "2026-06-19", + "releaseNotes": "v3.2.0 录音架构重构:APK 本地录制+上传+说话人分离" + }, + "crx": { + "version": "3.1.2", + "downloadUrl": "https://dl.brainwork.club/mindos-next/MindOS-NEXT-Clipper-latest.zip", + "releaseDate": "2026-04-29", + "releaseNotes": "V3.1.2:版本检查 + 更新提示 + alarms 定时检测" + }, + "cli": { + "version": "0.2.0", + "installCmd": "pipx install \"mindos-cli[audio] @ git+https://git.brainwork.club/lidf/MindOS_CLI.git\"", + "upgradeCmd": "pipx reinstall mindos-cli", + "releaseDate": "2026-07-01", + "releaseNotes": "v0.2.0 无状态原子化改造:pipelines/ 分层 + 双工录音 + Tunnel 回调解耦 + chat/ask 走 Cloud Gateway + pipx 自动更新修复" + }, + "hap": { + "version": "0.0.1", + "downloadUrl": "", + "releaseDate": "", + "releaseNotes": "开发中" + } +}