MindOS_CLI/mindcli/health.py
lidf 69dd868e2f init: MindOS CLI 本地执行体(从 mindOSv2/mindos-cli 独立)
- 独立 pyproject.toml(pip install -e .)
- vendor_hermes.sh 已改为显式路径模式(不再依赖相对目录)
- 包含 hermes vendor 快照
2026-04-28 13:12:54 +08:00

279 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Mind CLI — Health HTTP Server。
轻量 HTTP 端点localhost:8660供 Web UI 探测本地 CLI 是否在线。
新增 /tunnel/activate 端点,接收浏览器 JWT 授权并启动 Tunnel。
基于 http.server不引入额外依赖。
"""
import asyncio
import json
import logging
import os
import platform
import sys
import time
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
import mindcli
logger = logging.getLogger("mindcli.health")
# ── 全局状态Phase 2 tunnel.py 会写入) ──────────────────
_tunnel_status = "disconnected"
_tool_count = 0
def set_tunnel_status(status: str, tools: int = 0) -> None:
"""由 tunnel.py 调用,更新隧道状态。"""
global _tunnel_status, _tool_count
_tunnel_status = status
_tool_count = tools
# asyncio 事件循环tunnel 需要)
_loop: asyncio.AbstractEventLoop | None = None
def _detect_capabilities() -> list[str]:
"""检测当前 CLI 支持的能力列表。"""
caps = ["tunnel"]
try:
import sounddevice # noqa: F401
caps.append("audio")
except ImportError:
pass
return caps
class _HealthHandler(BaseHTTPRequestHandler):
"""处理 /health 和 /tunnel/activate 请求。"""
def do_GET(self):
if self.path == "/health":
body = json.dumps({
"ok": True,
"version": mindcli.__version__,
"vendor": _get_vendor_commit(),
"tunnel": _tunnel_status,
"tools": _tool_count,
"platform": platform.system(),
"pid": os.getpid(),
"capabilities": _detect_capabilities(),
}, ensure_ascii=False)
self._respond(200, body)
elif self.path == "/tunnel/status":
from mindcli.tunnel import get_tunnel_client
client = get_tunnel_client()
body = json.dumps({
"status": client.status,
"userId": client.user_id,
"tools": _tool_count,
})
self._respond(200, body)
elif self.path == "/record/status":
try:
from mindcli.recorder import get_recorder
body = json.dumps(get_recorder().status(), ensure_ascii=False)
self._respond(200, body)
except Exception as e:
self._respond(500, json.dumps({"error": str(e)}))
else:
self._respond(404, json.dumps({"error": "Not Found"}))
def do_POST(self):
"""处理 POST 请求。"""
if self.path == "/tunnel/activate":
self._handle_tunnel_activate()
elif self.path == "/record/start":
self._handle_record_start()
elif self.path == "/record/stop":
self._handle_record_stop()
else:
self._respond(404, json.dumps({"error": "Not Found"}))
def _handle_tunnel_activate(self):
"""浏览器授权激活 Tunnel。"""
try:
content_length = int(self.headers.get("Content-Length", 0))
raw = self.rfile.read(content_length)
data = json.loads(raw)
token = data.get("token")
tunnel_url = data.get("tunnelUrl")
if not token or not tunnel_url:
self._respond(400, json.dumps({"error": "Missing token or tunnelUrl"}))
return
# 在 asyncio 事件循环中启动 tunnel
from mindcli.tunnel import get_tunnel_client
client = get_tunnel_client()
if _loop and _loop.is_running():
future = asyncio.run_coroutine_threadsafe(
client.activate(token, tunnel_url), _loop
)
result = future.result(timeout=5)
else:
result = {"ok": True, "status": "no_event_loop"}
logger.info("[Health] Tunnel activate: %s", result)
self._respond(200, json.dumps(result))
except Exception as e:
logger.error("[Health] Tunnel activate 失败: %s", e)
self._respond(500, json.dumps({"error": str(e)}))
def _handle_record_start(self):
"""启动本地录音 → WS 推送到 Cloud ASR。"""
try:
content_length = int(self.headers.get("Content-Length", 0))
raw = self.rfile.read(content_length)
data = json.loads(raw) if raw else {}
token = data.get("token", "")
chat_id = data.get("chatId", "")
meeting_id = data.get("meetingId", f"rec_{int(time.time() * 1000)}")
# 构建 Cloud ASR WS URL
ws_base = data.get("wsUrl", "")
if not ws_base:
# 默认使用 Tunnel 所知的 Cloud 地址
ws_base = (
f"wss://agent.brainwork.club/mindos-next/ws/record"
f"?token={token}&chatId={chat_id}&meetingId={meeting_id}&source=system"
)
from mindcli.recorder import get_recorder
recorder = get_recorder()
source = data.get("source", "system") # "system" 或 "mic"
if _loop and _loop.is_running():
future = asyncio.run_coroutine_threadsafe(
recorder.start(
ws_url=ws_base, chat_id=chat_id,
meeting_id=meeting_id, source=source,
),
_loop,
)
result = future.result(timeout=10)
else:
result = {"error": "事件循环未运行,请先 mind start"}
self._respond(200, json.dumps(result, ensure_ascii=False))
except Exception as e:
logger.error("[Health] Record start 失败: %s", e)
self._respond(500, json.dumps({"error": str(e)}))
def _handle_record_stop(self):
"""停止本地录音。"""
try:
from mindcli.recorder import get_recorder
recorder = get_recorder()
if _loop and _loop.is_running():
future = asyncio.run_coroutine_threadsafe(recorder.stop(), _loop)
result = future.result(timeout=10)
else:
result = {"error": "事件循环未运行"}
self._respond(200, json.dumps(result, ensure_ascii=False))
except Exception as e:
logger.error("[Health] Record stop 失败: %s", e)
self._respond(500, json.dumps({"error": str(e)}))
def do_OPTIONS(self):
"""处理 CORS 预检请求。"""
self.send_response(204)
self._cors_headers()
self.end_headers()
def _respond(self, code: int, body: str):
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self._cors_headers()
self.end_headers()
self.wfile.write(body.encode("utf-8"))
def _cors_headers(self):
"""允许浏览器跨域访问Web UI → localhost:8660"""
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
def log_message(self, format, *args):
"""静默日志,避免刷屏。"""
pass
class _ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""多线程 HTTP Server避免慢客户端阻塞。"""
daemon_threads = True
def _get_vendor_commit() -> str:
commit_file = os.path.join(mindcli._VENDOR_DIR, "HERMES_COMMIT")
try:
with open(commit_file) as f:
return f.read().strip()
except FileNotFoundError:
return "unknown"
def start_health_server(port: int = 8660, foreground: bool = False) -> None:
"""
启动 Health HTTP Server + asyncio 事件循环Tunnel 需要)。
Args:
port: 监听端口(默认 8660
foreground: True = 阻塞前台运行False = 后台线程运行
"""
global _loop
# 确保 mindcli 下所有 logger 的 INFO 级别输出到控制台
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
force=True,
)
server = _ThreadedHTTPServer(("127.0.0.1", port), _HealthHandler)
if foreground:
print(f"🩺 Mind CLI Health Server listening on http://127.0.0.1:{port}/health")
print(f" Tunnel activate: POST http://127.0.0.1:{port}/tunnel/activate")
print(f" Version: {mindcli.__version__} | PID: {os.getpid()}")
print(f" Press Ctrl+C to stop.\n")
# HTTP Server 在独立线程
http_thread = threading.Thread(target=server.serve_forever, daemon=True)
http_thread.start()
# asyncio 事件循环在主线程Tunnel WebSocket 需要)
_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_loop)
try:
_loop.run_forever()
except KeyboardInterrupt:
print("\n🛑 Health Server stopped.")
finally:
_loop.close()
server.shutdown()
else:
# 后台模式HTTP + asyncio 都在后台
_loop = asyncio.new_event_loop()
def _run_loop():
asyncio.set_event_loop(_loop)
_loop.run_forever()
loop_thread = threading.Thread(target=_run_loop, daemon=True)
loop_thread.start()
http_thread = threading.Thread(target=server.serve_forever, daemon=True)
http_thread.start()
return server