MindOS_CLI/mindcli/recorder.py
lidf 69dd868e2f init: MindOS CLI 本地执行体(从 mindOSv2/mindos-cli 独立)
- 独立 pyproject.toml(pip install -e .)
- vendor_hermes.sh 已改为显式路径模式(不再依赖相对目录)
- 包含 hermes vendor 快照
2026-04-28 13:12:54 +08:00

437 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Mind CLI — 系统拾音引擎 (Phase 4)
双层架构·音频服务层:
CLI 作为音频服务层的一个独立源,只负责采集系统音频。
浏览器/APK 负责麦克风。双工模式下两路各走各的 Cloud ASR session。
支持两种采集模式(由调用方指定):
- "system"默认ScreenCaptureKit 系统音频macOS only
- "mic"sounddevice 麦克风(无浏览器的 fallback 场景)
不做混音。混音 = 伪需求。"关联两份转写"是 Agent 层的智力工作。
Cloud 端复用 dashscope_realtime.py 管线,零新增代码。
协议与前端 RecordingService 完全一致:
客户端→服务端: binary PCM16 帧 / {"type":"stop"}
服务端→客户端: {"type":"partial/final","text":"..."}
"""
import asyncio
import json
import logging
import threading
import time
from typing import Callable, Literal
logger = logging.getLogger("mindcli.recorder")
# ── 常量 ──────────────────────────────────────────────────
TARGET_SAMPLE_RATE = 16000 # Cloud ASR 要求 16kHz
CHUNK_DURATION_MS = 100 # 每帧 100ms
CHUNK_SAMPLES = TARGET_SAMPLE_RATE * CHUNK_DURATION_MS // 1000 # 1600
# ── 全局单例 ──────────────────────────────────────────────
_recorder: "SystemRecorder | None" = None
def get_recorder() -> "SystemRecorder":
"""获取全局 SystemRecorder 单例。"""
global _recorder
if _recorder is None:
_recorder = SystemRecorder()
return _recorder
class SystemRecorder:
"""
系统拾音引擎 — 音频服务层的一个独立源。
只负责单一音频源的采集 + WS 推送。
不做混音(双工模式下浏览器和 CLI 各自独立推送到 Cloud ASR
"""
def __init__(self):
self._running = False
self._ws = None
self._source: str = "system"
self._start_time = 0.0
self._chat_id = ""
self._meeting_id = ""
# 音频缓冲区(线程安全)
self._audio_buf: bytearray = bytearray()
self._buf_lock = threading.Lock()
# 采集资源
self._mic_stream = None # sounddevice.InputStream
self._sc_stream = None # SCStream
self._sc_delegate = None
# 推送线程
self._push_thread: threading.Thread | None = None
# asyncio 事件循环引用start 时保存)
self._loop: asyncio.AbstractEventLoop | None = None
# 事件回调
self._on_text: Callable[[str, str], None] | None = None
@property
def is_running(self) -> bool:
return self._running
def status(self) -> dict:
"""返回当前录音状态。"""
return {
"running": self._running,
"source": self._source if self._running else None,
"duration": round(time.time() - self._start_time, 1) if self._running else 0,
"chatId": self._chat_id,
"meetingId": self._meeting_id,
}
async def start(
self,
ws_url: str,
chat_id: str = "",
meeting_id: str = "",
source: Literal["system", "mic"] = "system",
on_text: Callable[[str, str], None] | None = None,
) -> dict:
"""
开始录音(单一源)。
Args:
ws_url: Cloud ASR WebSocket URL含 token/chatId/meetingId query
chat_id: 对话 ID
meeting_id: 录音批次 ID
source: "system"ScreenCaptureKit"mic"sounddevice
on_text: 收到转写文本的回调 (type, text)
Returns:
{"ok": True, "meetingId": "..."} 或 {"error": "..."}
"""
if self._running:
return {"error": "录音已在进行中"}
self._chat_id = chat_id
self._meeting_id = meeting_id or f"cli_rec_{int(time.time() * 1000)}"
self._source = source
self._on_text = on_text
self._audio_buf.clear()
self._loop = asyncio.get_running_loop()
# 1. 连接 Cloud ASR WebSocket
try:
import websockets
self._ws = await websockets.connect(ws_url)
logger.info("[Recorder] WS 已连接: %s", ws_url[:80])
except Exception as e:
logger.error("[Recorder] WS 连接失败: %s", e)
return {"error": f"WebSocket 连接失败: {e}"}
# 2. 启动音频采集
try:
if source == "system":
self._start_system_audio()
else:
self._start_mic()
except Exception as e:
logger.error("[Recorder] 音频源 '%s' 启动失败: %s", source, e)
await self._ws.close()
self._ws = None
return {"error": f"音频源启动失败: {e}"}
# 3. 启动推送线程
self._running = True
self._start_time = time.time()
self._push_thread = threading.Thread(target=self._push_loop, daemon=True)
self._push_thread.start()
# 4. 启动 WS 接收协程(转写结果)
asyncio.create_task(self._ws_recv_loop())
logger.info("[Recorder] 录音已开始 source=%s chatId=%s meetingId=%s",
source, chat_id, self._meeting_id)
return {"ok": True, "meetingId": self._meeting_id, "source": source}
async def stop(self) -> dict:
"""停止录音。"""
if not self._running:
return {"error": "未在录音"}
self._running = False
duration = round(time.time() - self._start_time, 1)
# 停止音频源
self._stop_capture()
# 等待推送线程结束
if self._push_thread and self._push_thread.is_alive():
self._push_thread.join(timeout=3)
# 发送 stop 命令
if self._ws:
try:
await self._ws.send(json.dumps({"type": "stop"}))
await asyncio.sleep(1)
await self._ws.close()
except Exception:
pass
self._ws = None
logger.info("[Recorder] 录音已停止 duration=%.1fs", duration)
return {"ok": True, "duration": duration, "meetingId": self._meeting_id}
# ── 麦克风采集sounddevice────────────────────────────
def _start_mic(self):
"""启动麦克风捕获 16kHz mono int16。"""
import sounddevice as sd
def _callback(indata, frames, time_info, status):
if status:
logger.debug("[Recorder] mic status: %s", status)
with self._buf_lock:
self._audio_buf.extend(indata.tobytes())
self._mic_stream = sd.InputStream(
samplerate=TARGET_SAMPLE_RATE,
channels=1,
dtype="int16",
blocksize=CHUNK_SAMPLES,
callback=_callback,
)
self._mic_stream.start()
logger.info("[Recorder] 麦克风已启动 @%dHz", TARGET_SAMPLE_RATE)
# ── 系统音频采集ScreenCaptureKit─────────────────────
def _start_system_audio(self):
"""启动 macOS 系统音频捕获。"""
import platform
if platform.system() != "Darwin":
raise RuntimeError("系统音频仅支持 macOS")
try:
from ScreenCaptureKit import (
SCStream,
SCStreamConfiguration,
SCContentFilter,
SCStreamOutputTypeAudio,
)
from dispatch import dispatch_queue_create, DISPATCH_QUEUE_SERIAL
except ImportError as e:
raise RuntimeError(
f"缺少 pyobjc 依赖,请运行: pip install mindos-cli[audio]\n{e}"
)
# 获取主显示器
content = _sync_get_sharable_content()
if not content or not content.displays():
raise RuntimeError("无法获取显示器列表")
display = content.displays()[0]
# 配置:只捕获音频
config = SCStreamConfiguration.alloc().init()
config.setCapturesAudio_(True)
config.setExcludesCurrentProcessAudio_(True)
config.setChannelCount_(1)
config.setSampleRate_(float(TARGET_SAMPLE_RATE))
# 内容过滤器
content_filter = SCContentFilter.alloc().initWithDisplay_excludingWindows_(
display, []
)
# 创建 delegate
_ensure_delegate_class()
self._sc_delegate = _SCStreamDelegate.alloc().init()
self._sc_delegate._recorder = self
# 创建 stream
self._sc_stream = SCStream.alloc().initWithFilter_configuration_delegate_(
content_filter, config, None
)
# dispatch queue
queue = dispatch_queue_create(b"mindcli.audio", DISPATCH_QUEUE_SERIAL)
self._sc_stream.addStreamOutput_type_sampleHandlerQueue_error_(
self._sc_delegate, SCStreamOutputTypeAudio, queue, None
)
# 启动
event = threading.Event()
error_holder = [None]
def _on_start(error):
if error:
error_holder[0] = str(error)
event.set()
self._sc_stream.startCaptureWithCompletionHandler_(_on_start)
event.wait(timeout=5)
if error_holder[0]:
raise RuntimeError(f"SCStream 启动失败: {error_holder[0]}")
logger.info("[Recorder] 系统音频已启动 (ScreenCaptureKit @%dHz)", TARGET_SAMPLE_RATE)
def _on_system_audio(self, raw_bytes: bytes):
"""系统音频回调。SCStream 输出 float32 PCM需要转为 int16。"""
import struct
# float32: 每个样本 4 字节int16: 每个样本 2 字节
n_samples = len(raw_bytes) // 4
if n_samples == 0:
return
# 解包 float32
floats = struct.unpack(f'<{n_samples}f', raw_bytes[:n_samples * 4])
# clamp [-1, 1] → scale to int16 range
int16_data = struct.pack(f'<{n_samples}h',
*(max(-32768, min(32767, int(s * 32767))) for s in floats)
)
with self._buf_lock:
self._audio_buf.extend(int16_data)
# ── 停止采集 ─────────────────────────────────────────────
def _stop_capture(self):
"""停止当前音频源。"""
# 麦克风
if self._mic_stream:
try:
self._mic_stream.stop()
self._mic_stream.close()
except Exception:
pass
self._mic_stream = None
# 系统音频
if self._sc_stream:
event = threading.Event()
self._sc_stream.stopCaptureWithCompletionHandler_(lambda e: event.set())
event.wait(timeout=3)
self._sc_stream = None
self._sc_delegate = None
# ── WS 推送 ──────────────────────────────────────────────
def _push_loop(self):
"""后台线程:定时取缓冲区 → WS 推送 PCM16 帧。"""
frame_bytes = CHUNK_SAMPLES * 2 # int16 = 2 bytes/sample = 3200
send_count = 0
while self._running:
time.sleep(CHUNK_DURATION_MS / 1000.0)
# 一次取尽缓冲区
with self._buf_lock:
if len(self._audio_buf) < frame_bytes:
continue
pending = bytes(self._audio_buf)
self._audio_buf.clear()
ws = self._ws
if ws is None:
continue
# 按帧大小分片发送
offset = 0
while offset + frame_bytes <= len(pending):
chunk = pending[offset:offset + frame_bytes]
try:
asyncio.run_coroutine_threadsafe(ws.send(chunk), self._loop)
send_count += 1
except Exception as e:
logger.debug("[Recorder] WS send err: %s", e)
break
offset += frame_bytes
# 每 ~5s 打一次发送统计
if send_count > 0 and send_count % 50 == 0:
logger.info("[Recorder] WS sent %d frames (%.1fs)",
send_count, send_count * CHUNK_DURATION_MS / 1000)
async def _ws_recv_loop(self):
"""接收 Cloud ASR 的转写结果。"""
try:
async for msg in self._ws:
try:
data = json.loads(msg)
msg_type = data.get("type", "")
text = data.get("text", "")
if msg_type in ("partial", "final") and text:
logger.info("[Recorder] %s: %s", msg_type, text[:50])
if self._on_text:
self._on_text(msg_type, text)
elif msg_type == "error":
logger.error("[Recorder] ASR error: %s", data.get("message"))
except (json.JSONDecodeError, TypeError):
pass
except Exception as e:
if self._running:
logger.warning("[Recorder] WS recv 断开: %s", e)
# ── 工具函数 ──────────────────────────────────────────────
def _sync_get_sharable_content():
"""同步获取 SCShareableContent阻塞等待 async 回调)。"""
from ScreenCaptureKit import SCShareableContent
result = [None]
event = threading.Event()
def _handler(content, error):
if error:
logger.error("[Recorder] SCShareableContent error: %s", error)
result[0] = content
event.set()
SCShareableContent.getShareableContentExcludingDesktopWindows_onScreenWindowsOnly_completionHandler_(
False, True, _handler
)
event.wait(timeout=5)
return result[0]
# ── SCStream Delegate ────────────────────────────────────
_SCStreamDelegate = None
def _define_sc_delegate():
"""延迟定义 SCStreamDelegate避免 import 时要求 pyobjc"""
from Foundation import NSObject
from ScreenCaptureKit import SCStreamOutputTypeAudio
import CoreMedia
class _Delegate(NSObject):
"""接收 SCStream 音频样本的 delegate。"""
_recorder = None
def stream_didOutputSampleBuffer_ofType_(self, stream, sample_buffer, output_type):
if output_type != SCStreamOutputTypeAudio:
return
if not self._recorder or not self._recorder.is_running:
return
try:
block_buf = CoreMedia.CMSampleBufferGetDataBuffer(sample_buffer)
if block_buf is None:
return
length = CoreMedia.CMBlockBufferGetDataLength(block_buf)
# CMBlockBufferCopyDataBytes 返回 (OSStatus, bytes_data)
status, raw_data = CoreMedia.CMBlockBufferCopyDataBytes(block_buf, 0, length, None)
if status == 0 and raw_data:
self._recorder._on_system_audio(raw_data)
except Exception as e:
logger.warning("[Recorder] SCStream sample error: %s", e, exc_info=True)
return _Delegate
def _ensure_delegate_class():
global _SCStreamDelegate
if _SCStreamDelegate is None:
_SCStreamDelegate = _define_sc_delegate()
return _SCStreamDelegate