""" Mind CLI — Health HTTP Server。 轻量 HTTP 端点(localhost:8660),供 Web UI 探测本地 CLI 是否在线。 新增 /tunnel/activate 端点,接收浏览器 JWT 授权并启动 Tunnel。 基于 http.server,不引入额外依赖。 """ import asyncio import json import logging import os import platform import sys import time import threading from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn import mindcli logger = logging.getLogger("mindcli.health") # ── 运行时状态(由调用方管线通过回调更新) ────────────── _tunnel_status = "disconnected" _tool_count = 0 # ── 活跃录音句柄(按 chatId 索引,可多实例并存 = 双工模式)── _active_captures: dict[str, "object"] = {} # chatId → CaptureHandle # ── Tunnel 句柄(由 /tunnel/activate 创建,单连接)──────── _tunnel_handle = None # TunnelHandle | None # asyncio 事件循环(tunnel 需要) _loop: asyncio.AbstractEventLoop | None = None def _detect_capabilities() -> list[str]: """检测当前 CLI 支持的能力列表。""" caps = ["tunnel"] try: import sounddevice # noqa: F401 caps.append("audio") except ImportError: pass return caps def _on_tunnel_status(status: str, tools: int = 0) -> None: """Tunnel 状态变更回调(由 TunnelHandle 通过 on_status 调用)。 替代旧版 tunnel.py 反向 import health.set_tunnel_status 的全局变量写入, 实现单向数据流:Tunnel → 回调 → health 局部状态。 """ global _tunnel_status, _tool_count _tunnel_status = status _tool_count = tools class _HealthHandler(BaseHTTPRequestHandler): """处理 /health 和 /tunnel/activate 请求。""" def do_GET(self): if self.path == "/health": body = json.dumps({ "ok": True, "version": mindcli.__version__, "vendor": _get_vendor_commit(), "tunnel": _tunnel_status, "tools": _tool_count, "platform": platform.system(), "pid": os.getpid(), "capabilities": _detect_capabilities(), }, ensure_ascii=False) self._respond(200, body) elif self.path == "/tunnel/status": body = json.dumps({ "status": _tunnel_handle.status if _tunnel_handle else "disconnected", "userId": _tunnel_handle.user_id if _tunnel_handle else None, "tools": _tool_count, }) self._respond(200, body) elif self.path == "/record/status": try: # 遍历所有活跃录音句柄(双工模式下可有多路) if _active_captures: captures = [h.status() for h in _active_captures.values()] else: captures = [] body = json.dumps({"running": len(captures) > 0, "captures": captures}, ensure_ascii=False) self._respond(200, body) except Exception as e: self._respond(500, json.dumps({"error": str(e)})) else: self._respond(404, json.dumps({"error": "Not Found"})) def do_POST(self): """处理 POST 请求。""" # 去掉 query string 后再匹配路由(/record/stop?chatId=xxx → /record/stop) path = self.path.split("?")[0] if path == "/tunnel/activate": self._handle_tunnel_activate() elif path == "/record/start": self._handle_record_start() elif path == "/record/stop": self._handle_record_stop() else: self._respond(404, json.dumps({"error": "Not Found"})) def _handle_tunnel_activate(self): """浏览器授权激活 Tunnel。""" try: content_length = int(self.headers.get("Content-Length", 0)) raw = self.rfile.read(content_length) data = json.loads(raw) token = data.get("token") tunnel_url = data.get("tunnelUrl") if not token or not tunnel_url: self._respond(400, json.dumps({"error": "Missing token or tunnelUrl"})) return # 在 asyncio 事件循环中启动 tunnel(无状态管线 + 回调) from mindcli.pipelines.tunnel_session import connect as tunnel_connect if _loop and _loop.is_running(): # 若已有旧 handle,先断开 global _tunnel_handle async def _activate(): global _tunnel_handle if _tunnel_handle: await _tunnel_handle.disconnect() _tunnel_handle = await tunnel_connect( url=tunnel_url, jwt=token, on_status=_on_tunnel_status, ) return {"ok": True, "status": "connecting"} future = asyncio.run_coroutine_threadsafe(_activate(), _loop) result = future.result(timeout=5) else: result = {"ok": True, "status": "no_event_loop"} logger.info("[Health] Tunnel activate: %s", result) self._respond(200, json.dumps(result)) except Exception as e: logger.error("[Health] Tunnel activate 失败: %s", e) self._respond(500, json.dumps({"error": str(e)})) def _handle_record_start(self): """启动本地录音 → WS 推送到 Cloud ASR。""" try: content_length = int(self.headers.get("Content-Length", 0)) raw = self.rfile.read(content_length) data = json.loads(raw) if raw else {} token = data.get("token", "") chat_id = data.get("chatId", "") meeting_id = data.get("meetingId", f"rec_{int(time.time() * 1000)}") # 构建 Cloud ASR WS URL ws_base = data.get("wsUrl", "") if not ws_base: # 默认使用 Tunnel 所知的 Cloud 地址 ws_base = ( f"wss://agent.brainwork.club/mindos-next/ws/record" f"?token={token}&chatId={chat_id}&meetingId={meeting_id}&source=system" ) source = data.get("source", "system") # "system" 或 "mic" if _loop and _loop.is_running(): from mindcli.pipelines.audio_capture import capture as capture_audio future = asyncio.run_coroutine_threadsafe( capture_audio( ws_url=ws_base, chat_id=chat_id, meeting_id=meeting_id, source=source, ), _loop, ) handle = future.result(timeout=10) # 存入 _active_captures,按 chatId 索引(双工模式可多路并存) _active_captures[chat_id or meeting_id] = handle result = {"ok": True, "meetingId": handle.meeting_id, "source": source} else: result = {"error": "事件循环未运行,请先 mind start"} self._respond(200, json.dumps(result, ensure_ascii=False)) except Exception as e: logger.error("[Health] Record start 失败: %s", e) self._respond(500, json.dumps({"error": str(e)})) def _handle_record_stop(self): """停止本地录音。""" try: # 支持 /record/stop?chatId=xxx 停止单路;无参则停止所有 from urllib.parse import urlparse, parse_qs parsed = urlparse(self.path) qs = parse_qs(parsed.query) target_chat = qs.get("chatId", [None])[0] if _loop and _loop.is_running(): if target_chat: # 停止指定 chatId 的单路录音 handle = _active_captures.pop(target_chat, None) if handle: future = asyncio.run_coroutine_threadsafe(handle.stop(), _loop) result = future.result(timeout=10) else: result = {"error": f"无活跃录音 chatId={target_chat}"} elif _active_captures: # 停止所有活跃录音(全局 [停止录音]) results = [] for cid in list(_active_captures.keys()): handle = _active_captures.pop(cid, None) if handle: future = asyncio.run_coroutine_threadsafe(handle.stop(), _loop) results.append(future.result(timeout=10)) result = {"ok": True, "stopped": len(results), "results": results} else: result = {"error": "未在录音"} else: result = {"error": "事件循环未运行"} self._respond(200, json.dumps(result, ensure_ascii=False)) except Exception as e: logger.error("[Health] Record stop 失败: %s", e) self._respond(500, json.dumps({"error": str(e)})) def do_OPTIONS(self): """处理 CORS 预检请求。""" self.send_response(204) self._cors_headers() self.end_headers() def _respond(self, code: int, body: str): self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self._cors_headers() self.end_headers() self.wfile.write(body.encode("utf-8")) def _cors_headers(self): """允许浏览器跨域访问(Web UI → localhost:8660)。""" self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") def log_message(self, format, *args): """静默日志,避免刷屏。""" pass class _ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """多线程 HTTP Server,避免慢客户端阻塞。""" daemon_threads = True def _get_vendor_commit() -> str: """读取 _vendor/VENDOR_COMMIT 文件,提取 commit hash。 VENDOR_COMMIT 是多行标记文件(# 注释 + source:/commit:/date:), 只返回 commit: 行的值,不是整个文件内容。 """ commit_file = os.path.join(mindcli._VENDOR_DIR, "VENDOR_COMMIT") try: with open(commit_file) as f: for line in f: if line.startswith("commit:"): return line.split(":", 1)[1].strip() return "unknown" except FileNotFoundError: return "unknown" def start_health_server(port: int = 8660, foreground: bool = False) -> None: """ 启动 Health HTTP Server + asyncio 事件循环(Tunnel 需要)。 Args: port: 监听端口(默认 8660) foreground: True = 阻塞前台运行;False = 后台线程运行 """ global _loop # 确保 mindcli 下所有 logger 的 INFO 级别输出到控制台 logging.basicConfig( level=logging.INFO, format="%(message)s", force=True, ) server = _ThreadedHTTPServer(("127.0.0.1", port), _HealthHandler) if foreground: print(f"🩺 Mind CLI Health Server listening on http://127.0.0.1:{port}/health") print(f" Tunnel activate: POST http://127.0.0.1:{port}/tunnel/activate") print(f" Version: {mindcli.__version__} | PID: {os.getpid()}") print(f" Press Ctrl+C to stop.\n") # HTTP Server 在独立线程 http_thread = threading.Thread(target=server.serve_forever, daemon=True) http_thread.start() # asyncio 事件循环在主线程(Tunnel WebSocket 需要) _loop = asyncio.new_event_loop() asyncio.set_event_loop(_loop) try: _loop.run_forever() except KeyboardInterrupt: print("\n🛑 Health Server stopped.") finally: _loop.close() server.shutdown() else: # 后台模式:HTTP + asyncio 都在后台 _loop = asyncio.new_event_loop() def _run_loop(): asyncio.set_event_loop(_loop) _loop.run_forever() loop_thread = threading.Thread(target=_run_loop, daemon=True) loop_thread.start() http_thread = threading.Thread(target=server.serve_forever, daemon=True) http_thread.start() return server