MindOS_CLI/docs/SPEC_mindcli_atomization.md
lidf aecd81ec8b feat: 无状态原子化改造 v0.2.0 — pipelines/ 分层 + 双工录音 + pipx 自动更新修复
## 架构改造

将 recorder/tunnel/managed_mcp 三个有状态单例模块拆成
mindcli/pipelines/ 下的无状态管线 + health.py 调用方持有状态。
参照 hermes-overlay/infra/pipelines/anyfile2md.py 的分层模式。

### pipelines/ 层(无状态)
- audio_capture.py: capture() 工厂函数返回独立 CaptureHandle,
  无单例无互斥,双工模式(system+mic 并行)在代码层面可用
- tunnel_session.py: connect() 工厂函数 + on_status 回调,
  消除 health ⇄ tunnel 循环耦合(单向数据流)
- tool_proxy.py: ToolProxy 替代 ManagedMCP,非单例

### health.py 改造
- _active_captures dict 按 chatId 索引,可多实例并存
- _tunnel_handle 由调用方持有,on_status 回调更新状态
- /record/stop 支持 ?chatId= 停单路或全停
- /record/status 返回所有活跃录音列表

### cli.py 改造
- chat/ask 走 run_agent headless + Cloud Gateway JWT(铁律 A)
- 保留 --offline 走 vendor TUI(铁律 C:断开即自治)
- mind update 修复 pipx 场景:
  - 检测 pipx venv → pipx reinstall
  - 非 pipx → sys.executable -m pip(修复 venv 里 pip 找不到)
  - 防降级保护(远端版本低于本地时不升级)
  - 远端 upgradeCmd 字段下发

### 顺手修复
- health.py / capability.py 的 HERMES_COMMIT → VENDOR_COMMIT
- 版本号 0.1.0 → 0.2.0(__init__.py + pyproject.toml)
- 新增 versions.json 仓库模板(installCmd 改为 pipx,新增 upgradeCmd)

### 删除
- recorder.py → 逻辑迁入 pipelines/audio_capture.py
- tunnel.py → 逻辑迁入 pipelines/tunnel_session.py
- managed_mcp.py → 逻辑迁入 pipelines/tool_proxy.py

SPEC: docs/SPEC_mindcli_atomization.md
2026-07-01 14:56:16 +08:00

599 lines
23 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SPEC: Mind CLI 无状态原子化改造
> **版本**: v1.0
> **日期**: 2026-07-01
> **状态**: Draft — 待确认后执行
> **适用范围**: MindOS_CLI (`mindcli/` 薄壳层,不含 `_vendor/`)
> **前置依赖**: `SPEC_mindos_next_cli.md` v2.3(设计哲学与三铁律)
> **参照模式**: `hermes-overlay/infra/pipelines/anyfile2md.py`(无状态管线组合器)
---
## 一、改造动机
### 1.1 当前状态:有状态单体
`mindcli/` 薄壳层7 个文件 / 1633 行)当前是**进程级有状态单体**,具体表现:
| 模块 | 状态性 | 行数 | 问题 |
|:---|:---|---:|:---|
| `recorder.py` | 模块级单例 `_recorder` + `_running` 互斥 | 436 | **一进程只能一路录音**,双工模式不可用 |
| `tunnel.py` | 模块级单例 `_tunnel_client`,反向写 `health._tunnel_status` | 242 | 循环耦合,状态经全局变量传递 |
| `health.py` | 模块级全局 `_loop` / `_tunnel_status` / `_tool_count` | 278 | 状态枢纽 + 端口 8660 绑定 |
| `managed_mcp.py` | 实例级状态,经 tunnel 单例间接唯一 | 183 | 随 tunnel 存活,无法独立测试 |
| `cli.py` | 自身无状态,但 `mind chat` 拉起 vendor TUI | 272 | `_vendor/cli.py` 有 7 个模块级全局 + `sys.exit()` |
| `capability.py` | 无状态纯函数 | 79 | ✅ 已达标 |
| `service.py` | 无状态纯函数 | 116 | ✅ 已达标 |
### 1.2 与设计目标的偏差
对照 `SPEC_mindos_next_cli.md` 的三铁律:
| 铁律 | 设计要求 | 当前实现 | 偏差 |
|:---|:---|:---|:---:|
| **A决策权归云端** | CLI 的 LLM 调用 100% 走 Cloud GatewayJWT 计费) | `mind chat` 拉起 vendor TUI本地持有 `conversation_history` + SQLite session`sys.exit()` 杀进程 | 🔴 端有自己的脑 |
| **B能力报告义务** | CLI 连接时报告能力 | `capability.py` 无状态扫描 | ✅ 已达标 |
| **C连接即受管/断开即自治** | 两种模式互斥,边界清晰 | tunnel 单例 + recorder 单例耦合在同一进程,断开后 recorder 无受管方 | 🟡 边界模糊 |
### 1.3 最严重的功能性后果
**双工模式在代码层面不可能实现。**
设计文档 §Phase4 明确要求:
> 双工模式 = 浏览器 mic session A + CLI system session B两个独立 ASR session。
但当前 `recorder.py` 的实现:
```python
# recorder.py:35-43
_recorder: "SystemRecorder | None" = None
def get_recorder() -> "SystemRecorder":
global _recorder
if _recorder is None:
_recorder = SystemRecorder()
return _recorder
# recorder.py:110-111start 方法内)
if self._running:
return {"error": "录音已在进行中"}
```
单例 + `_running` 互斥 = 第二路录音直接返回 error。双工模式不可能。
### 1.4 参照模式anyfile2md
`hermes-overlay/infra/pipelines/anyfile2md.py`177 行)是项目内已验证的"无状态管线组合器"范式:
```
infra/atoms/ ← 原子text_sniffer / vlm_ocr / page_rasterizer ...
↑ 组合
infra/pipelines/ ← 无状态管线anyfile2md: parse() / parseLocal() / isSupported()
↑ 调用
platforms/*_sse ← SSE 子类(绑定 SSE 推送 + DB + 积分等状态化职责)
```
**关键特征**
- 管线层无 `self`、无类属性、无模块级可变状态。
- 文件头声明:"管线只做编排,不做逻辑。无状态:不写 DB、不推 SSE、不管 session。"
- 进度通知通过 `onProgress` 回调注入,管线本身不依赖它。
- 状态化职责DB / SSE / 积分)全部下沉到调用方 SSE 子类。
Mind CLI 应照此分层:**把 recorder/tunnel/managed_mcp 的核心逻辑抽成无状态管线,状态化职责上交给 health server 调用方。**
---
## 二、改造目标
### 2.1 不改的
- **不把 MindCLI 变成 bookqa_sse 那样的独立无状态 SSE 原子。** MindCLI 是 Cloud 的执行节点(设计文档 §1.1),依赖 Cloud Gateway 做 LLM 调用、依赖 Cloud ASR 做转写。它的"连接态"必须存在,不可能做到 `stateless: True`
- **不碰 `_vendor/` 内部。** vendor 快照是只读副本,内部重构应在 hermes 上游做,快照只消费。
- **不改 `capability.py``service.py`。** 已是无状态纯函数,达标。
### 2.2 要改的
| # | 改造对象 | 目标 | 价值 |
|:---:|:---|:---|:---|
| 1 | `recorder.py` | 拆成 `pipelines/audio_capture.py`(无状态)+ health handler有状态调用方 | **解锁双工模式** |
| 2 | `tunnel.py` | 拆成 `pipelines/tunnel_session.py`(无状态)+ health handler持有 handle | **消除循环耦合** |
| 3 | `managed_mcp.py` | 抽出 `pipelines/tool_proxy.py`(无状态执行)+ 实例级白名单 | 解耦生命周期 |
| 4 | `cli.py``chat`/`ask` | 走 `run_agent` headless 接口,不拉 vendor TUI | **修正铁律 A 偏差** |
### 2.3 改造后的分层架构
```
Layer 3: 调用方(有状态,进程级)
mindcli/health.py — HTTP handler
持有: _tunnel_handle, _active_captures: dict[chatId, CaptureHandle]
职责: 端口绑定、生命周期管理、状态聚合
↓ 调用
Layer 2: 无状态管线(新增,类比 infra/pipelines/
mindcli/pipelines/audio_capture.py — capture(source, ws_ep, on_text) -> CaptureHandle
mindcli/pipelines/tunnel_session.py — connect(url, jwt, on_dispatch, on_status) -> TunnelHandle
mindcli/pipelines/tool_proxy.py — execute(tool, params, approved_set) -> dict
↓ 组合
Layer 1: 原子(无状态函数,已存在)
mindcli/capability.py — scan_capabilities() -> dict
mindcli/service.py — install_service() / uninstall_service()
managed_mcp executors — _exec_terminal / _exec_file_read / ...(纯 subprocess/IO
```
---
## 三、详细设计
### 3.1 第一刀recorder.py → pipelines/audio_capture.py
**当前问题**436 行):
```
recorder.py
├── _recorder 全局单例 (L35)
├── get_recorder() 工厂 (L38)
├── SystemRecorder 类 (L46)
│ ├── __init__: _running / _ws / _source / _chat_id / _buf / _loop ... (L54)
│ ├── start(): if _running: return error (L89, L110)
│ ├── stop() (L154)
│ ├── _start_mic() (L184)
│ ├── _start_system_audio() + _on_system_audio() (L206, L277)
│ ├── _stop_capture() (L295)
│ ├── _push_loop() (L316)
│ └── _ws_recv_loop() (L352)
├── _sync_get_sharable_content() (L375) ← macOS 权限检测,无状态
├── _SCStreamDelegate 定义 (L397-432) ← pyobjc 类定义,无状态
```
**拆分方案**
```
mindcli/pipelines/audio_capture.py (新增,~200 行,无状态)
├── async def capture(source, ws_endpoint, on_text) -> CaptureHandle
├── class CaptureHandle: ← 轻量句柄,非单例
│ async def stop() -> dict
│ def status() -> dict
├── def check_permissions() -> bool ← 从 _sync_get_sharable_content 迁入
└── _SCStreamDelegate (pyobjc 类,从 recorder.py 迁入)
mindcli/health.py (修改,有状态调用方)
├── _active_captures: dict[str, CaptureHandle] = {} ← 按 chatId 索引,可多实例
├── /record/start handler:
│ handle = await capture(source, ws_ep, on_text)
│ _active_captures[chatId] = handle
└── /record/stop handler:
handle = _active_captures.pop(chatId)
await handle.stop()
```
**核心变化**
```python
# ── 改造前recorder.py单例 + 互斥)─────────────
_recorder = None
def get_recorder():
global _recorder
if _recorder is None:
_recorder = SystemRecorder()
return _recorder
class SystemRecorder:
async def start(self, ...):
if self._running:
return {"error": "录音已在进行中"}
# ...
# ── 改造后pipelines/audio_capture.py无状态───
async def capture(
source: str, # "system" | "mic"
ws_endpoint: str, # Cloud ASR WebSocket URL
on_text: Callable, # 回调:转写结果
) -> CaptureHandle:
"""启动一路音频采集,返回独立句柄。无单例、无互斥。
可多次调用,每次返回独立 handle双工模式 = 两个 handle 并存。
"""
handle = CaptureHandle(source, ws_endpoint, on_text)
await handle._start()
return handle
class CaptureHandle:
"""一路音频采集的句柄。生命周期由调用方管理。"""
# 不持有全局状态,不是单例
```
**双工模式验证**
```python
# health.py 的 /record/start handler
handle_a = await capture("mic", ws_ep, on_text) # 浏览器 mic
_active_captures["chatA"] = handle_a
handle_b = await capture("system", ws_ep, on_text) # CLI 系统音频
_active_captures["chatB"] = handle_b
# ✅ 两路独立并存,符合设计文档 §Phase4
```
### 3.2 第二刀tunnel.py → pipelines/tunnel_session.py
**当前问题**242 行):
```
tunnel.py
├── TunnelClient 类 (L23)
│ ├── __init__: _jwt / _ws / _managed_mcp / 3 个后台 Task (L34)
│ ├── activate(jwt, url) (L70)
│ ├── disconnect() (L92)
│ ├── _connect_loop() (L115) ← 重连退避
│ ├── _connect() (L139) ← 握手 + 能力上报
│ ├── _run() (L188) ← 消息循环
│ └── _handle_tool_call(msg) (L208)
├── _tunnel_client = TunnelClient() (L237) ← 模块级单例
└── get_tunnel_client() (L240)
```
**循环耦合**`tunnel.py:184` 反向 `from mindcli.health import set_tunnel_status`,写 `health._tunnel_status` 全局变量。
**拆分方案**
```
mindcli/pipelines/tunnel_session.py (新增,~150 行,无状态)
├── async def connect(url, jwt, on_dispatch, on_status) -> TunnelHandle
├── class TunnelHandle:
│ async def disconnect()
│ def status() -> str
│ def update_approved(tools)
└── (重连/心跳/消息循环逻辑迁入,状态通过 on_status 回调通知)
mindcli/health.py (修改)
├── _tunnel_handle: TunnelHandle | None = None
├── /tunnel/activate handler:
│ _tunnel_handle = await connect(url, jwt,
│ on_dispatch=self._dispatch_tool_call,
│ on_status=self._on_tunnel_status) ← 回调更新状态,不写全局变量
└── def _on_tunnel_status(self, status, tools=0):
_tunnel_status = status ← health 自己的局部状态
```
**核心变化**
```python
# ── 改造前tunnel.py单例 + 反向写 health 全局)──
_tunnel_client = TunnelClient()
class TunnelClient:
async def _connect(self):
# ...
from mindcli.health import set_tunnel_status # ← 反向耦合
set_tunnel_status("connected", len(tools))
# ── 改造后pipelines/tunnel_session.py回调注入
async def connect(
url: str,
jwt: str,
on_dispatch: Callable, # 工具调用回调
on_status: Callable, # 状态变更回调
) -> TunnelHandle:
"""建立 Tunnel 连接,状态变更通过 on_status 回调通知。
不持有全局状态,不反向 import 调用方模块。
"""
handle = TunnelHandle(url, jwt, on_dispatch, on_status)
await handle._start()
return handle
```
### 3.3 第三刀managed_mcp.py → pipelines/tool_proxy.py
**当前状态**183 行):已接近无状态,白名单是实例级 `_approved: set[str]`executors 是纯 subprocess/IO。但通过 `tunnel._managed_mcp` 间接成为进程级唯一实例。
**拆分方案**(改动最小):
```
mindcli/pipelines/tool_proxy.py (新增,~120 行)
├── async def execute(tool_name, params, approved_set) -> dict
├── _EXECUTORS: dict[str, Callable] ← 从 managed_mcp._register_executors 迁入
└── def is_approved(tool_name, approved_set) -> bool
mindcli/health.py 或 tunnel handle 持有 approved_set
```
executor 函数(`_exec_terminal` / `_exec_file_read` / `_exec_file_write` / `_exec_grep` / `_exec_file_ops` / `_exec_code`)本身就是纯 subprocess/IO直接迁入即可。
### 3.4 第四刀cli.py chat/ask 走 run_agent headless
**当前问题**
```python
# cli.py:30-43chat 命令)
@main.command()
def chat(model, skills, resume):
from cli import main as hermes_main # ← _vendor/cli.py 的 10000 行 TUI
hermes_main(**kwargs) # ← sys.exit() 杀进程
```
`_vendor/cli.py``main()`
-`os.environ["HERMES_INTERACTIVE"] = "1"`
- 注册 `atexit` 钩子
- 修改模块级 `_active_agent_ref` / `_active_worktree` / `_cleanup_done`
- 单查询模式 `sys.exit(0)` / `sys.exit(1)`
**不可当作无状态函数调用。**
**改造方案**
```python
# cli.py 改造后
@main.command()
def chat(model, skills, resume):
"""交互式 Chat — 走 Cloud GatewayJWT 计费)。"""
from run_agent import AIAgent # ← headless不拉 TUI
agent = AIAgent(
model=model or None,
base_url=_get_cloud_gateway_url(), # 指向 Cloud Gateway
api_key=_get_jwt(), # JWT from tunnel handle
)
agent.run_interactive(skills=skills, resume=resume)
@main.command()
def ask(question, model, fmt):
"""单次查询 — 走 Cloud GatewayJWT 计费)。"""
from run_agent import AIAgent
agent = AIAgent(
model=model or None,
base_url=_get_cloud_gateway_url(),
api_key=_get_jwt(),
)
result = agent.run_query(question)
_output(result, fmt)
```
**vendor `cli.py` 的 TUI 只在"离线独立模式"(铁律 C 的断开即自治)时才拉起**,作为 `mind chat --offline` 的备选路径。
### 3.5 改造后文件结构
```
mindcli/
├── __init__.py # 22 行,不变
├── __main__.py # 5 行,不变
├── cli.py # ~250 行chat/ask 改走 run_agent
├── health.py # ~350 行,持有 handles状态枢纽
├── service.py # 116 行,不变
├── capability.py # 79 行,不变
├── pipelines/ # ★ 新增目录
│ ├── __init__.py
│ ├── audio_capture.py # ~200 行,从 recorder.py 抽出
│ ├── tunnel_session.py # ~150 行,从 tunnel.py 抽出
│ └── tool_proxy.py # ~120 行,从 managed_mcp.py 抽出
└── _vendor/ # 不变
```
**删除的文件**(逻辑迁入 pipelines/ 后):
- `recorder.py` → 逻辑迁入 `pipelines/audio_capture.py`pyobjc delegate 类随之迁入
- `tunnel.py` → 逻辑迁入 `pipelines/tunnel_session.py`
- `managed_mcp.py` → 逻辑迁入 `pipelines/tool_proxy.py`
**薄壳净行数变化**
| 改造前 | 改造后 |
|:---|:---|
| 1633 行7 文件) | ~1167 行8 文件,含 pipelines/ 3 个新文件) |
净减约 466 行(单例/工厂/全局变量的模板代码消除),同时新增"双工模式可用"和"循环耦合消除"两个能力。
---
## 四、改造后的状态边界
### 4.1 状态归属表
| 状态 | 改造前归属 | 改造后归属 | 改善 |
|:---|:---|:---|:---|
| 端口 8660 + event loop | `health._loop` (全局) | `health._loop` (不变) | — |
| Tunnel 连接 + JWT | `tunnel._tunnel_client` (单例) | `health._tunnel_handle` (调用方持有) | 消除循环耦合 |
| Tunnel 状态字符串 | `health._tunnel_status` (被 tunnel 反向写) | `health._tunnel_status` (on_status 回调写) | 单向数据流 |
| 录音句柄 | `recorder._recorder` (单例 + 互斥) | `health._active_captures[chatId]` (dict可多实例) | **双工可用** |
| 工具白名单 | `tunnel._managed_mcp._approved` | `health._approved_set` 或 tunnel handle 持有 | 生命周期独立 |
| LLM 会话 | vendor `cli.py``HermesCLI.agent` + SQLite | `run_agent.AIAgent`headless走 Cloud Gateway | **铁律 A 合规** |
### 4.2 依赖图对比
**改造前**(有循环):
```
health.py ⇄ tunnel.py ← 经全局变量双向耦合
health.py → recorder.py ← 经单例耦合
tunnel.py → managed_mcp ← 经单例间接耦合
cli.py → _vendor/cli.py ← 拉起重状态 TUI
```
**改造后**(单向,无环):
```
health.py → pipelines/tunnel_session.py (调用 connect持 handle)
health.py → pipelines/audio_capture.py (调用 capture持 handle)
health.py → pipelines/tool_proxy.py (调用 execute持 approved_set)
pipelines/* → capability.py / executors (只组合原子,不反向 import health)
cli.py → run_agent.AIAgent (headless走 Cloud Gateway)
```
### 4.3 铁律合规对照
| 铁律 | 改造前 | 改造后 |
|:---|:---:|:---:|
| A决策权归云端 | 🔴 端有自己的脑vendor TUI + 本地 session | ✅ chat/ask 走 run_agent + Cloud Gateway JWT |
| B能力报告义务 | ✅ | ✅(不变) |
| C连接即受管/断开即自治 | 🟡 边界模糊 | ✅ tunnel handle + capture handle 生命周期由 health 管理,断开即释放 |
---
## 五、实施路线图
### Phase 1: audio_capture 管线 + 双工解锁2 天)
```
□ 创建 mindcli/pipelines/ 目录 + __init__.py
□ 从 recorder.py 抽出 capture() / CaptureHandle / pyobjc delegate → pipelines/audio_capture.py
□ 修改 health.py/record/start 用 _active_captures[chatId] 替代 get_recorder()
□ 修改 health.py/record/stop 从 _active_captures pop 后调 handle.stop()
□ 删除 recorder.py逻辑已迁出
□ 验证mind record start --source system + mind record start --source mic 并行成功
□ 验证mind record stop 各自独立停止
```
### Phase 2: tunnel_session 管线 + 解循环1.5 天)
```
□ 从 tunnel.py 抽出 connect() / TunnelHandle → pipelines/tunnel_session.py
□ on_status 回调替代反向 import health.set_tunnel_status
□ 修改 health.py/tunnel/activate 持有 _tunnel_handle
□ 删除 tunnel.py逻辑已迁出
□ 验证mind tunnel connect → Cloud LLM 看到 local_* 工具
□ 验证:断开后 _tunnel_handle = None状态正确回退
```
### Phase 3: tool_proxy 管线0.5 天)
```
□ 从 managed_mcp.py 抽出 execute() / _EXECUTORS → pipelines/tool_proxy.py
□ 白名单 set 由调用方传入,不再经 tunnel 单例
□ 删除 managed_mcp.py逻辑已迁出
□ 验证:工具调用正常,白名单过滤生效
```
### Phase 4: chat/ask 走 run_agent1 天)
```
□ cli.py chat/ask 改用 _vendor/run_agent.py 的 AIAgent headless 接口
□ base_url 指向 Cloud Gatewayapi_key 用 JWT
□ vendor cli.py TUI 保留为 mind chat --offline 备选
□ 验证mind ask "hello" → 走 Cloud Gateway → 积分扣费
□ 验证mind chat → 交互模式 → 走 Cloud Gateway
```
**总计 ~5 个工作日。**
---
## 六、风险与回退
| 风险 | 等级 | 缓解 |
|:---|:---:|:---|
| pyobjc delegate 类迁移后权限引导断裂 | 🟡 | Phase 1 验证时首跑 `mind record start --source system`,确认 macOS 录屏权限弹窗正常 |
| run_agent headless 接口不支持 `--skills` | 🟡 | Phase 4 先验证 `AIAgent(model=, base_url=, api_key=)` 签名,必要时在 vendor 层薄封装 |
| Cloud Gateway JWT 链路未通 | 🟠 | Phase 4 可先保留 vendor TUI 路径作为 fallbackGateway 通后再切 |
| 改造期间 _vendor 快照落后于 hermes 上游 | 🟢 | 改造不碰 _vendor与 vendor 同步纪律正交 |
**回退策略**:每个 Phase 独立,可按 Phase 回退。`recorder.py` / `tunnel.py` / `managed_mcp.py` 在 git 历史中保留,任一 Phase 失败可 revert 对应 commit。
---
## 七、决策记录
| # | 决策 | 理由 |
|:---|:---|:---|
| D1 | 选拆 recorder 为第一刀 | 双工模式是设计文档 §Phase4 的核心卖点,当前代码层面不可用,价值最高 |
| D2 | 用回调注入替代全局变量 | 消除 health ⇄ tunnel 循环耦合,单向数据流 |
| D3 | chat/ask 走 run_agent 而非 vendor TUI | 修正铁律 A决策权归云端vendor TUI 的 `sys.exit()` 和 7 个全局变量使其不可当函数调用 |
| D4 | 不把 CLI 变成 SSE 无状态原子 | CLI 是执行节点,依赖 Cloud Gateway/ASR连接态必须存在 |
| D5 | 保留 vendor cli.py 作为 `--offline` 备选 | 铁律 C断开即自治要求离线模式可用 |
| D6 | pipelines/ 目录而非 infra/ | MindOS_CLI 是独立包,不依赖 hermes-overlaypipelines/ 是自包含的无状态层 |
---
## 附录:改造前后代码对照
### A.1 录音:单例 → 多实例
```python
# ═══ 改造前recorder.py ════════════════════════════
_recorder: "SystemRecorder | None" = None
def get_recorder() -> "SystemRecorder":
global _recorder
if _recorder is None:
_recorder = SystemRecorder()
return _recorder
class SystemRecorder:
async def start(self, source, ws_ep, chat_id, on_text):
if self._running: # ← 互斥
return {"error": "录音已在进行中"}
# ...
# ═══ 改造后pipelines/audio_capture.py ════════════
async def capture(
source: str,
ws_endpoint: str,
on_text: Callable[[str], None],
) -> CaptureHandle:
"""启动一路采集,返回独立句柄。可多实例并存。"""
handle = CaptureHandle(source, ws_endpoint, on_text)
await handle._start()
return handle
class CaptureHandle:
"""一路音频采集的句柄。生命周期由调用方管理。"""
async def stop(self) -> dict: ...
def status(self) -> dict: ...
```
### A.2 Tunnel全局变量 → 回调注入
```python
# ═══ 改造前tunnel.py ══════════════════════════════
_tunnel_client = TunnelClient()
class TunnelClient:
async def _connect(self):
# ...
from mindcli.health import set_tunnel_status # ← 反向耦合
set_tunnel_status("connected", len(tools))
# ═══ 改造后pipelines/tunnel_session.py ═══════════
async def connect(
url: str,
jwt: str,
on_dispatch: Callable,
on_status: Callable, # ← 状态变更通过回调通知
) -> TunnelHandle:
handle = TunnelHandle(url, jwt, on_dispatch, on_status)
await handle._start()
return handle
# health.py 调用方
_tunnel_handle = await connect(
url, jwt,
on_dispatch=self._dispatch_tool_call,
on_status=lambda s, t=0: (_tunnel_status.__set_name__(...)) # 自己的状态自己写
)
```
### A.3 chat/askvendor TUI → run_agent headless
```python
# ═══ 改造前cli.py ════════════════════════════════
@main.command()
def chat(model, skills, resume):
from cli import main as hermes_main # ← _vendor/cli.py 10000 行 TUI
hermes_main(**kwargs) # ← sys.exit() 杀进程
# ═══ 改造后cli.py ════════════════════════════════
@main.command()
def chat(model, skills, resume):
from run_agent import AIAgent # ← headless
agent = AIAgent(
model=model or None,
base_url=_get_cloud_gateway_url(),
api_key=_get_jwt(),
)
agent.run_interactive(skills=skills, resume=resume)
```
---
*最后更新2026-07-01 v1.0*