## 架构改造 将 recorder/tunnel/managed_mcp 三个有状态单例模块拆成 mindcli/pipelines/ 下的无状态管线 + health.py 调用方持有状态。 参照 hermes-overlay/infra/pipelines/anyfile2md.py 的分层模式。 ### pipelines/ 层(无状态) - audio_capture.py: capture() 工厂函数返回独立 CaptureHandle, 无单例无互斥,双工模式(system+mic 并行)在代码层面可用 - tunnel_session.py: connect() 工厂函数 + on_status 回调, 消除 health ⇄ tunnel 循环耦合(单向数据流) - tool_proxy.py: ToolProxy 替代 ManagedMCP,非单例 ### health.py 改造 - _active_captures dict 按 chatId 索引,可多实例并存 - _tunnel_handle 由调用方持有,on_status 回调更新状态 - /record/stop 支持 ?chatId= 停单路或全停 - /record/status 返回所有活跃录音列表 ### cli.py 改造 - chat/ask 走 run_agent headless + Cloud Gateway JWT(铁律 A) - 保留 --offline 走 vendor TUI(铁律 C:断开即自治) - mind update 修复 pipx 场景: - 检测 pipx venv → pipx reinstall - 非 pipx → sys.executable -m pip(修复 venv 里 pip 找不到) - 防降级保护(远端版本低于本地时不升级) - 远端 upgradeCmd 字段下发 ### 顺手修复 - health.py / capability.py 的 HERMES_COMMIT → VENDOR_COMMIT - 版本号 0.1.0 → 0.2.0(__init__.py + pyproject.toml) - 新增 versions.json 仓库模板(installCmd 改为 pipx,新增 upgradeCmd) ### 删除 - recorder.py → 逻辑迁入 pipelines/audio_capture.py - tunnel.py → 逻辑迁入 pipelines/tunnel_session.py - managed_mcp.py → 逻辑迁入 pipelines/tool_proxy.py SPEC: docs/SPEC_mindcli_atomization.md
326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""
|
||
Mind CLI — Health HTTP Server。
|
||
|
||
轻量 HTTP 端点(localhost:8660),供 Web UI 探测本地 CLI 是否在线。
|
||
新增 /tunnel/activate 端点,接收浏览器 JWT 授权并启动 Tunnel。
|
||
基于 http.server,不引入额外依赖。
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import platform
|
||
import sys
|
||
import time
|
||
import threading
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
from socketserver import ThreadingMixIn
|
||
|
||
import mindcli
|
||
|
||
logger = logging.getLogger("mindcli.health")
|
||
|
||
# ── 运行时状态(由调用方管线通过回调更新) ──────────────
|
||
_tunnel_status = "disconnected"
|
||
_tool_count = 0
|
||
|
||
# ── 活跃录音句柄(按 chatId 索引,可多实例并存 = 双工模式)──
|
||
_active_captures: dict[str, "object"] = {} # chatId → CaptureHandle
|
||
|
||
# ── Tunnel 句柄(由 /tunnel/activate 创建,单连接)────────
|
||
_tunnel_handle = None # TunnelHandle | None
|
||
|
||
|
||
# asyncio 事件循环(tunnel 需要)
|
||
_loop: asyncio.AbstractEventLoop | None = None
|
||
|
||
def _detect_capabilities() -> list[str]:
|
||
"""检测当前 CLI 支持的能力列表。"""
|
||
caps = ["tunnel"]
|
||
try:
|
||
import sounddevice # noqa: F401
|
||
caps.append("audio")
|
||
except ImportError:
|
||
pass
|
||
return caps
|
||
|
||
|
||
def _on_tunnel_status(status: str, tools: int = 0) -> None:
|
||
"""Tunnel 状态变更回调(由 TunnelHandle 通过 on_status 调用)。
|
||
|
||
替代旧版 tunnel.py 反向 import health.set_tunnel_status 的全局变量写入,
|
||
实现单向数据流:Tunnel → 回调 → health 局部状态。
|
||
"""
|
||
global _tunnel_status, _tool_count
|
||
_tunnel_status = status
|
||
_tool_count = tools
|
||
|
||
|
||
class _HealthHandler(BaseHTTPRequestHandler):
|
||
"""处理 /health 和 /tunnel/activate 请求。"""
|
||
|
||
def do_GET(self):
|
||
if self.path == "/health":
|
||
body = json.dumps({
|
||
"ok": True,
|
||
"version": mindcli.__version__,
|
||
"vendor": _get_vendor_commit(),
|
||
"tunnel": _tunnel_status,
|
||
"tools": _tool_count,
|
||
"platform": platform.system(),
|
||
"pid": os.getpid(),
|
||
"capabilities": _detect_capabilities(),
|
||
}, ensure_ascii=False)
|
||
self._respond(200, body)
|
||
elif self.path == "/tunnel/status":
|
||
body = json.dumps({
|
||
"status": _tunnel_handle.status if _tunnel_handle else "disconnected",
|
||
"userId": _tunnel_handle.user_id if _tunnel_handle else None,
|
||
"tools": _tool_count,
|
||
})
|
||
self._respond(200, body)
|
||
elif self.path == "/record/status":
|
||
try:
|
||
# 遍历所有活跃录音句柄(双工模式下可有多路)
|
||
if _active_captures:
|
||
captures = [h.status() for h in _active_captures.values()]
|
||
else:
|
||
captures = []
|
||
body = json.dumps({"running": len(captures) > 0, "captures": captures},
|
||
ensure_ascii=False)
|
||
self._respond(200, body)
|
||
except Exception as e:
|
||
self._respond(500, json.dumps({"error": str(e)}))
|
||
else:
|
||
self._respond(404, json.dumps({"error": "Not Found"}))
|
||
|
||
def do_POST(self):
|
||
"""处理 POST 请求。"""
|
||
# 去掉 query string 后再匹配路由(/record/stop?chatId=xxx → /record/stop)
|
||
path = self.path.split("?")[0]
|
||
if path == "/tunnel/activate":
|
||
self._handle_tunnel_activate()
|
||
elif path == "/record/start":
|
||
self._handle_record_start()
|
||
elif path == "/record/stop":
|
||
self._handle_record_stop()
|
||
else:
|
||
self._respond(404, json.dumps({"error": "Not Found"}))
|
||
|
||
def _handle_tunnel_activate(self):
|
||
"""浏览器授权激活 Tunnel。"""
|
||
try:
|
||
content_length = int(self.headers.get("Content-Length", 0))
|
||
raw = self.rfile.read(content_length)
|
||
data = json.loads(raw)
|
||
|
||
token = data.get("token")
|
||
tunnel_url = data.get("tunnelUrl")
|
||
|
||
if not token or not tunnel_url:
|
||
self._respond(400, json.dumps({"error": "Missing token or tunnelUrl"}))
|
||
return
|
||
|
||
# 在 asyncio 事件循环中启动 tunnel(无状态管线 + 回调)
|
||
from mindcli.pipelines.tunnel_session import connect as tunnel_connect
|
||
|
||
if _loop and _loop.is_running():
|
||
# 若已有旧 handle,先断开
|
||
global _tunnel_handle
|
||
async def _activate():
|
||
global _tunnel_handle
|
||
if _tunnel_handle:
|
||
await _tunnel_handle.disconnect()
|
||
_tunnel_handle = await tunnel_connect(
|
||
url=tunnel_url,
|
||
jwt=token,
|
||
on_status=_on_tunnel_status,
|
||
)
|
||
return {"ok": True, "status": "connecting"}
|
||
|
||
future = asyncio.run_coroutine_threadsafe(_activate(), _loop)
|
||
result = future.result(timeout=5)
|
||
else:
|
||
result = {"ok": True, "status": "no_event_loop"}
|
||
|
||
logger.info("[Health] Tunnel activate: %s", result)
|
||
self._respond(200, json.dumps(result))
|
||
|
||
except Exception as e:
|
||
logger.error("[Health] Tunnel activate 失败: %s", e)
|
||
self._respond(500, json.dumps({"error": str(e)}))
|
||
|
||
def _handle_record_start(self):
|
||
"""启动本地录音 → WS 推送到 Cloud ASR。"""
|
||
try:
|
||
content_length = int(self.headers.get("Content-Length", 0))
|
||
raw = self.rfile.read(content_length)
|
||
data = json.loads(raw) if raw else {}
|
||
|
||
token = data.get("token", "")
|
||
chat_id = data.get("chatId", "")
|
||
meeting_id = data.get("meetingId", f"rec_{int(time.time() * 1000)}")
|
||
|
||
# 构建 Cloud ASR WS URL
|
||
ws_base = data.get("wsUrl", "")
|
||
if not ws_base:
|
||
# 默认使用 Tunnel 所知的 Cloud 地址
|
||
ws_base = (
|
||
f"wss://agent.brainwork.club/mindos-next/ws/record"
|
||
f"?token={token}&chatId={chat_id}&meetingId={meeting_id}&source=system"
|
||
)
|
||
|
||
source = data.get("source", "system") # "system" 或 "mic"
|
||
|
||
if _loop and _loop.is_running():
|
||
from mindcli.pipelines.audio_capture import capture as capture_audio
|
||
future = asyncio.run_coroutine_threadsafe(
|
||
capture_audio(
|
||
ws_url=ws_base, chat_id=chat_id,
|
||
meeting_id=meeting_id, source=source,
|
||
),
|
||
_loop,
|
||
)
|
||
handle = future.result(timeout=10)
|
||
# 存入 _active_captures,按 chatId 索引(双工模式可多路并存)
|
||
_active_captures[chat_id or meeting_id] = handle
|
||
result = {"ok": True, "meetingId": handle.meeting_id, "source": source}
|
||
else:
|
||
result = {"error": "事件循环未运行,请先 mind start"}
|
||
|
||
self._respond(200, json.dumps(result, ensure_ascii=False))
|
||
except Exception as e:
|
||
logger.error("[Health] Record start 失败: %s", e)
|
||
self._respond(500, json.dumps({"error": str(e)}))
|
||
|
||
def _handle_record_stop(self):
|
||
"""停止本地录音。"""
|
||
try:
|
||
# 支持 /record/stop?chatId=xxx 停止单路;无参则停止所有
|
||
from urllib.parse import urlparse, parse_qs
|
||
parsed = urlparse(self.path)
|
||
qs = parse_qs(parsed.query)
|
||
target_chat = qs.get("chatId", [None])[0]
|
||
|
||
if _loop and _loop.is_running():
|
||
if target_chat:
|
||
# 停止指定 chatId 的单路录音
|
||
handle = _active_captures.pop(target_chat, None)
|
||
if handle:
|
||
future = asyncio.run_coroutine_threadsafe(handle.stop(), _loop)
|
||
result = future.result(timeout=10)
|
||
else:
|
||
result = {"error": f"无活跃录音 chatId={target_chat}"}
|
||
elif _active_captures:
|
||
# 停止所有活跃录音(全局 [停止录音])
|
||
results = []
|
||
for cid in list(_active_captures.keys()):
|
||
handle = _active_captures.pop(cid, None)
|
||
if handle:
|
||
future = asyncio.run_coroutine_threadsafe(handle.stop(), _loop)
|
||
results.append(future.result(timeout=10))
|
||
result = {"ok": True, "stopped": len(results), "results": results}
|
||
else:
|
||
result = {"error": "未在录音"}
|
||
else:
|
||
result = {"error": "事件循环未运行"}
|
||
|
||
self._respond(200, json.dumps(result, ensure_ascii=False))
|
||
except Exception as e:
|
||
logger.error("[Health] Record stop 失败: %s", e)
|
||
self._respond(500, json.dumps({"error": str(e)}))
|
||
|
||
def do_OPTIONS(self):
|
||
"""处理 CORS 预检请求。"""
|
||
self.send_response(204)
|
||
self._cors_headers()
|
||
self.end_headers()
|
||
|
||
def _respond(self, code: int, body: str):
|
||
self.send_response(code)
|
||
self.send_header("Content-Type", "application/json; charset=utf-8")
|
||
self._cors_headers()
|
||
self.end_headers()
|
||
self.wfile.write(body.encode("utf-8"))
|
||
|
||
def _cors_headers(self):
|
||
"""允许浏览器跨域访问(Web UI → localhost:8660)。"""
|
||
self.send_header("Access-Control-Allow-Origin", "*")
|
||
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
|
||
self.send_header("Access-Control-Allow-Headers", "Content-Type")
|
||
|
||
def log_message(self, format, *args):
|
||
"""静默日志,避免刷屏。"""
|
||
pass
|
||
|
||
|
||
class _ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
|
||
"""多线程 HTTP Server,避免慢客户端阻塞。"""
|
||
daemon_threads = True
|
||
|
||
|
||
def _get_vendor_commit() -> str:
|
||
commit_file = os.path.join(mindcli._VENDOR_DIR, "VENDOR_COMMIT")
|
||
try:
|
||
with open(commit_file) as f:
|
||
return f.read().strip()
|
||
except FileNotFoundError:
|
||
return "unknown"
|
||
|
||
|
||
def start_health_server(port: int = 8660, foreground: bool = False) -> None:
|
||
"""
|
||
启动 Health HTTP Server + asyncio 事件循环(Tunnel 需要)。
|
||
|
||
Args:
|
||
port: 监听端口(默认 8660)
|
||
foreground: True = 阻塞前台运行;False = 后台线程运行
|
||
"""
|
||
global _loop
|
||
|
||
# 确保 mindcli 下所有 logger 的 INFO 级别输出到控制台
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(message)s",
|
||
force=True,
|
||
)
|
||
|
||
server = _ThreadedHTTPServer(("127.0.0.1", port), _HealthHandler)
|
||
|
||
if foreground:
|
||
print(f"🩺 Mind CLI Health Server listening on http://127.0.0.1:{port}/health")
|
||
print(f" Tunnel activate: POST http://127.0.0.1:{port}/tunnel/activate")
|
||
print(f" Version: {mindcli.__version__} | PID: {os.getpid()}")
|
||
print(f" Press Ctrl+C to stop.\n")
|
||
|
||
# HTTP Server 在独立线程
|
||
http_thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||
http_thread.start()
|
||
|
||
# asyncio 事件循环在主线程(Tunnel WebSocket 需要)
|
||
_loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(_loop)
|
||
try:
|
||
_loop.run_forever()
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 Health Server stopped.")
|
||
finally:
|
||
_loop.close()
|
||
server.shutdown()
|
||
else:
|
||
# 后台模式:HTTP + asyncio 都在后台
|
||
_loop = asyncio.new_event_loop()
|
||
|
||
def _run_loop():
|
||
asyncio.set_event_loop(_loop)
|
||
_loop.run_forever()
|
||
|
||
loop_thread = threading.Thread(target=_run_loop, daemon=True)
|
||
loop_thread.start()
|
||
|
||
http_thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||
http_thread.start()
|
||
|
||
return server
|