""" Mind CLI — 系统拾音引擎 (Phase 4) 双层架构·音频服务层: CLI 作为音频服务层的一个独立源,只负责采集系统音频。 浏览器/APK 负责麦克风。双工模式下两路各走各的 Cloud ASR session。 支持两种采集模式(由调用方指定): - "system"(默认):ScreenCaptureKit 系统音频(macOS only) - "mic":sounddevice 麦克风(无浏览器的 fallback 场景) 不做混音。混音 = 伪需求。"关联两份转写"是 Agent 层的智力工作。 Cloud 端复用 dashscope_realtime.py 管线,零新增代码。 协议与前端 RecordingService 完全一致: 客户端→服务端: binary PCM16 帧 / {"type":"stop"} 服务端→客户端: {"type":"partial/final","text":"..."} """ import asyncio import json import logging import threading import time from typing import Callable, Literal logger = logging.getLogger("mindcli.recorder") # ── 常量 ────────────────────────────────────────────────── TARGET_SAMPLE_RATE = 16000 # Cloud ASR 要求 16kHz CHUNK_DURATION_MS = 100 # 每帧 100ms CHUNK_SAMPLES = TARGET_SAMPLE_RATE * CHUNK_DURATION_MS // 1000 # 1600 # ── 全局单例 ────────────────────────────────────────────── _recorder: "SystemRecorder | None" = None def get_recorder() -> "SystemRecorder": """获取全局 SystemRecorder 单例。""" global _recorder if _recorder is None: _recorder = SystemRecorder() return _recorder class SystemRecorder: """ 系统拾音引擎 — 音频服务层的一个独立源。 只负责单一音频源的采集 + WS 推送。 不做混音(双工模式下浏览器和 CLI 各自独立推送到 Cloud ASR)。 """ def __init__(self): self._running = False self._ws = None self._source: str = "system" self._start_time = 0.0 self._chat_id = "" self._meeting_id = "" # 音频缓冲区(线程安全) self._audio_buf: bytearray = bytearray() self._buf_lock = threading.Lock() # 采集资源 self._mic_stream = None # sounddevice.InputStream self._sc_stream = None # SCStream self._sc_delegate = None # 推送线程 self._push_thread: threading.Thread | None = None # asyncio 事件循环引用(start 时保存) self._loop: asyncio.AbstractEventLoop | None = None # 事件回调 self._on_text: Callable[[str, str], None] | None = None @property def is_running(self) -> bool: return self._running def status(self) -> dict: """返回当前录音状态。""" return { "running": self._running, "source": self._source if self._running else None, "duration": round(time.time() - self._start_time, 1) if self._running else 0, "chatId": self._chat_id, "meetingId": self._meeting_id, } async def start( self, ws_url: str, chat_id: str = "", meeting_id: str = "", source: Literal["system", "mic"] = "system", on_text: Callable[[str, str], None] | None = None, ) -> dict: """ 开始录音(单一源)。 Args: ws_url: Cloud ASR WebSocket URL(含 token/chatId/meetingId query) chat_id: 对话 ID meeting_id: 录音批次 ID source: "system"(ScreenCaptureKit)或 "mic"(sounddevice) on_text: 收到转写文本的回调 (type, text) Returns: {"ok": True, "meetingId": "..."} 或 {"error": "..."} """ if self._running: return {"error": "录音已在进行中"} self._chat_id = chat_id self._meeting_id = meeting_id or f"cli_rec_{int(time.time() * 1000)}" self._source = source self._on_text = on_text self._audio_buf.clear() self._loop = asyncio.get_running_loop() # 1. 连接 Cloud ASR WebSocket try: import websockets self._ws = await websockets.connect(ws_url) logger.info("[Recorder] WS 已连接: %s", ws_url[:80]) except Exception as e: logger.error("[Recorder] WS 连接失败: %s", e) return {"error": f"WebSocket 连接失败: {e}"} # 2. 启动音频采集 try: if source == "system": self._start_system_audio() else: self._start_mic() except Exception as e: logger.error("[Recorder] 音频源 '%s' 启动失败: %s", source, e) await self._ws.close() self._ws = None return {"error": f"音频源启动失败: {e}"} # 3. 启动推送线程 self._running = True self._start_time = time.time() self._push_thread = threading.Thread(target=self._push_loop, daemon=True) self._push_thread.start() # 4. 启动 WS 接收协程(转写结果) asyncio.create_task(self._ws_recv_loop()) logger.info("[Recorder] 录音已开始 source=%s chatId=%s meetingId=%s", source, chat_id, self._meeting_id) return {"ok": True, "meetingId": self._meeting_id, "source": source} async def stop(self) -> dict: """停止录音。""" if not self._running: return {"error": "未在录音"} self._running = False duration = round(time.time() - self._start_time, 1) # 停止音频源 self._stop_capture() # 等待推送线程结束 if self._push_thread and self._push_thread.is_alive(): self._push_thread.join(timeout=3) # 发送 stop 命令 if self._ws: try: await self._ws.send(json.dumps({"type": "stop"})) await asyncio.sleep(1) await self._ws.close() except Exception: pass self._ws = None logger.info("[Recorder] 录音已停止 duration=%.1fs", duration) return {"ok": True, "duration": duration, "meetingId": self._meeting_id} # ── 麦克风采集(sounddevice)──────────────────────────── def _start_mic(self): """启动麦克风捕获 16kHz mono int16。""" import sounddevice as sd def _callback(indata, frames, time_info, status): if status: logger.debug("[Recorder] mic status: %s", status) with self._buf_lock: self._audio_buf.extend(indata.tobytes()) self._mic_stream = sd.InputStream( samplerate=TARGET_SAMPLE_RATE, channels=1, dtype="int16", blocksize=CHUNK_SAMPLES, callback=_callback, ) self._mic_stream.start() logger.info("[Recorder] 麦克风已启动 @%dHz", TARGET_SAMPLE_RATE) # ── 系统音频采集(ScreenCaptureKit)───────────────────── def _start_system_audio(self): """启动 macOS 系统音频捕获。""" import platform if platform.system() != "Darwin": raise RuntimeError("系统音频仅支持 macOS") try: from ScreenCaptureKit import ( SCStream, SCStreamConfiguration, SCContentFilter, SCStreamOutputTypeAudio, ) from dispatch import dispatch_queue_create, DISPATCH_QUEUE_SERIAL except ImportError as e: raise RuntimeError( f"缺少 pyobjc 依赖,请运行: pip install mindos-cli[audio]\n{e}" ) # 获取主显示器 content = _sync_get_sharable_content() if not content or not content.displays(): raise RuntimeError("无法获取显示器列表") display = content.displays()[0] # 配置:只捕获音频 config = SCStreamConfiguration.alloc().init() config.setCapturesAudio_(True) config.setExcludesCurrentProcessAudio_(True) config.setChannelCount_(1) config.setSampleRate_(float(TARGET_SAMPLE_RATE)) # 内容过滤器 content_filter = SCContentFilter.alloc().initWithDisplay_excludingWindows_( display, [] ) # 创建 delegate _ensure_delegate_class() self._sc_delegate = _SCStreamDelegate.alloc().init() self._sc_delegate._recorder = self # 创建 stream self._sc_stream = SCStream.alloc().initWithFilter_configuration_delegate_( content_filter, config, None ) # dispatch queue queue = dispatch_queue_create(b"mindcli.audio", DISPATCH_QUEUE_SERIAL) self._sc_stream.addStreamOutput_type_sampleHandlerQueue_error_( self._sc_delegate, SCStreamOutputTypeAudio, queue, None ) # 启动 event = threading.Event() error_holder = [None] def _on_start(error): if error: error_holder[0] = str(error) event.set() self._sc_stream.startCaptureWithCompletionHandler_(_on_start) event.wait(timeout=5) if error_holder[0]: raise RuntimeError(f"SCStream 启动失败: {error_holder[0]}") logger.info("[Recorder] 系统音频已启动 (ScreenCaptureKit @%dHz)", TARGET_SAMPLE_RATE) def _on_system_audio(self, raw_bytes: bytes): """系统音频回调。SCStream 输出 float32 PCM,需要转为 int16。""" import struct # float32: 每个样本 4 字节;int16: 每个样本 2 字节 n_samples = len(raw_bytes) // 4 if n_samples == 0: return # 解包 float32 floats = struct.unpack(f'<{n_samples}f', raw_bytes[:n_samples * 4]) # clamp [-1, 1] → scale to int16 range int16_data = struct.pack(f'<{n_samples}h', *(max(-32768, min(32767, int(s * 32767))) for s in floats) ) with self._buf_lock: self._audio_buf.extend(int16_data) # ── 停止采集 ───────────────────────────────────────────── def _stop_capture(self): """停止当前音频源。""" # 麦克风 if self._mic_stream: try: self._mic_stream.stop() self._mic_stream.close() except Exception: pass self._mic_stream = None # 系统音频 if self._sc_stream: event = threading.Event() self._sc_stream.stopCaptureWithCompletionHandler_(lambda e: event.set()) event.wait(timeout=3) self._sc_stream = None self._sc_delegate = None # ── WS 推送 ────────────────────────────────────────────── def _push_loop(self): """后台线程:定时取缓冲区 → WS 推送 PCM16 帧。""" frame_bytes = CHUNK_SAMPLES * 2 # int16 = 2 bytes/sample = 3200 send_count = 0 while self._running: time.sleep(CHUNK_DURATION_MS / 1000.0) # 一次取尽缓冲区 with self._buf_lock: if len(self._audio_buf) < frame_bytes: continue pending = bytes(self._audio_buf) self._audio_buf.clear() ws = self._ws if ws is None: continue # 按帧大小分片发送 offset = 0 while offset + frame_bytes <= len(pending): chunk = pending[offset:offset + frame_bytes] try: asyncio.run_coroutine_threadsafe(ws.send(chunk), self._loop) send_count += 1 except Exception as e: logger.debug("[Recorder] WS send err: %s", e) break offset += frame_bytes # 每 ~5s 打一次发送统计 if send_count > 0 and send_count % 50 == 0: logger.info("[Recorder] WS sent %d frames (%.1fs)", send_count, send_count * CHUNK_DURATION_MS / 1000) async def _ws_recv_loop(self): """接收 Cloud ASR 的转写结果。""" try: async for msg in self._ws: try: data = json.loads(msg) msg_type = data.get("type", "") text = data.get("text", "") if msg_type in ("partial", "final") and text: logger.info("[Recorder] %s: %s", msg_type, text[:50]) if self._on_text: self._on_text(msg_type, text) elif msg_type == "error": logger.error("[Recorder] ASR error: %s", data.get("message")) except (json.JSONDecodeError, TypeError): pass except Exception as e: if self._running: logger.warning("[Recorder] WS recv 断开: %s", e) # ── 工具函数 ────────────────────────────────────────────── def _sync_get_sharable_content(): """同步获取 SCShareableContent(阻塞等待 async 回调)。""" from ScreenCaptureKit import SCShareableContent result = [None] event = threading.Event() def _handler(content, error): if error: logger.error("[Recorder] SCShareableContent error: %s", error) result[0] = content event.set() SCShareableContent.getShareableContentExcludingDesktopWindows_onScreenWindowsOnly_completionHandler_( False, True, _handler ) event.wait(timeout=5) return result[0] # ── SCStream Delegate ──────────────────────────────────── _SCStreamDelegate = None def _define_sc_delegate(): """延迟定义 SCStreamDelegate(避免 import 时要求 pyobjc)。""" from Foundation import NSObject from ScreenCaptureKit import SCStreamOutputTypeAudio import CoreMedia class _Delegate(NSObject): """接收 SCStream 音频样本的 delegate。""" _recorder = None def stream_didOutputSampleBuffer_ofType_(self, stream, sample_buffer, output_type): if output_type != SCStreamOutputTypeAudio: return if not self._recorder or not self._recorder.is_running: return try: block_buf = CoreMedia.CMSampleBufferGetDataBuffer(sample_buffer) if block_buf is None: return length = CoreMedia.CMBlockBufferGetDataLength(block_buf) # CMBlockBufferCopyDataBytes 返回 (OSStatus, bytes_data) status, raw_data = CoreMedia.CMBlockBufferCopyDataBytes(block_buf, 0, length, None) if status == 0 and raw_data: self._recorder._on_system_audio(raw_data) except Exception as e: logger.warning("[Recorder] SCStream sample error: %s", e, exc_info=True) return _Delegate def _ensure_delegate_class(): global _SCStreamDelegate if _SCStreamDelegate is None: _SCStreamDelegate = _define_sc_delegate() return _SCStreamDelegate