import asyncio import hashlib import os import time import logging import json import re from pathlib import Path from hermes_constants import get_hermes_home logger = logging.getLogger(__name__) async def process_uploaded_material_oss( oss_key: str, original_filename: str, context_id: str, push_event_fn, user_id: str, org_id: str = "org_001" ): """ 1. Downloads native PDF from OSS 2. Runs deduplication 3. Sniffs doc to decide between VLM or pure text 4. Executes the relevant pipeline to produce .md """ from hermes_constants import get_hermes_home # 1. Fetch from OSS oss_ak = os.getenv("ALIYUN_ACCESS_KEY_ID", "") oss_sk = os.getenv("ALIYUN_ACCESS_KEY_SECRET", "") oss_bucket = os.getenv("OSS_BUCKET", "meetings-dev") oss_endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com") if not all([oss_ak, oss_sk]): logger.error("[DeepviewMaterials] Missing OSS keys in backend .env. Skipping ingestion.") return try: import oss2 auth = oss2.Auth(oss_ak, oss_sk) bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket) logger.info(f"[DeepviewMaterials] Over internal network fetching {oss_key}...") result = bucket.get_object(oss_key) file_bytes = result.read() except Exception as e: logger.error(f"[DeepviewMaterials] OSS Download failed: {e}") return # 2. Hashing file_hash = hashlib.md5(file_bytes).hexdigest() # 3. 解析 Context 路由(深维专属双上下文 FS-as-Database 管线) storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage")) storage_root = Path(storageDir) user_id_safe = user_id if user_id else "unknown" org_id_safe = org_id if org_id else "org_001" user_dir = storage_root / "users" / user_id_safe org_dir = storage_root / "orgs" / org_id_safe platform_dir = storage_root / "platform" ext = os.path.splitext(original_filename)[1].lower() # 过滤恶意文件 if ext == '.url' or ext == '.exe' or ext == '.sh': logger.warning(f"Rejected unsafe file: {original_filename}") return # 定义路由目标 if context_id.startswith("recording:"): recordingId = context_id.split(":", 1)[1] parts = recordingId.split("/") if len(parts) > 1: # 旧格式(已归档):recording:clientId/asrId clientId = parts[0] asrId = parts[-1] base_dir = user_dir / "clients" / clientId / "history" raw_path = base_dir / f"{asrId}_raw{ext}" md_path = base_dir / f"{asrId}.md" else: # 新格式(Inbox):recording:asrId asrId = parts[0] base_dir = user_dir / "inbox" / asrId raw_path = base_dir / f"asr_raw{ext}" md_path = base_dir / "asr.md" elif context_id.startswith("client:"): clientId = context_id.split(":", 1)[1] base_dir = user_dir / "clients" / clientId # 客户全景上下文:碎片化资料存档(可能是合同、病历单据等) raw_path = base_dir / f"{file_hash}_raw{ext}" md_path = base_dir / f"{file_hash}.md" elif context_id.startswith("wiki:") or context_id == "deepview": # 存入本机构域的 wiki 库 base_dir = org_dir / "wiki" raw_path = base_dir / f"{file_hash}_raw{ext}" md_path = base_dir / f"{file_hash}.md" elif context_id.startswith("platform:"): # 预留平台运维口 base_dir = platform_dir / "wiki" raw_path = base_dir / f"{file_hash}_raw{ext}" md_path = base_dir / f"{file_hash}.md" elif context_id.startswith("doctor:"): base_dir = user_dir raw_path = base_dir / f"doctor_profile_raw{ext}" md_path = base_dir / "doctor_profile.md" else: # Fallback 容错隔离区 base_dir = user_dir / "misc" / context_id.replace(":", "_") raw_path = base_dir / f"{file_hash}_raw{ext}" md_path = base_dir / f"{file_hash}.md" base_dir.mkdir(parents=True, exist_ok=True) # 幂等性检查:文件是否已经存在 if md_path.exists(): logger.info(f"[DeepviewMaterials] File {original_filename} already processed for {context_id}.") push_event_fn(user_id, "material:done", { "projectId": context_id, "filename": original_filename, "fileId": file_hash }) return # 存储原始二进制文件(用于溯源和重试) with open(raw_path, "wb") as f: f.write(file_bytes) # Process loop = asyncio.get_event_loop() try: if ext == ".pdf": # Sniffer if _is_vlm_needed(str(raw_path)): logger.info("[DeepviewMaterials] Sniffer: Routing to OSS+VLM Pipeline.") await loop.run_in_executor(None, _extract_pdf_vlm, str(raw_path), str(md_path), bucket, file_hash, original_filename) else: logger.info("[DeepviewMaterials] Sniffer: Routing to Pymupdf4llm text Pipeline.") await loop.run_in_executor(None, _extract_pdf_to_md, str(raw_path), str(md_path), original_filename) elif ext in [".docx", ".doc"]: await loop.run_in_executor(None, _extract_docx, str(raw_path), str(md_path), original_filename) elif ext in [".txt", ".md", ".csv"]: await loop.run_in_executor(None, _extract_txt, str(raw_path), str(md_path), original_filename) elif ext in [".m4a", ".mp3", ".wav", ".webm", ".ogg"]: await loop.run_in_executor(None, _extract_audio_asr, str(raw_path), str(md_path), original_filename, bucket, oss_key) else: await loop.run_in_executor(None, _simulate_extract, str(md_path), original_filename) # ★ 持久化到 deepview_materials 表(DB 唯一真相) try: from hermes_state import SessionDB db = SessionDB() def _do(conn): conn.execute( "INSERT OR IGNORE INTO deepview_materials (id, filename, context_id, user_id, source, created_at) " "VALUES (?, ?, ?, ?, 'upload', ?)", (file_hash, original_filename, context_id, user_id, time.time()) ) db._execute_write(_do) except Exception as e: logger.warning(f"[DeepviewMaterials] Failed to persist to DB: {e}") # Emit SSE success logger.info(f"[DeepviewMaterials] Successfully ingested {original_filename}") push_event_fn(user_id, "material:done", { "projectId": context_id, "filename": original_filename, "fileId": file_hash }) except Exception as e: logger.error(f"[DeepviewMaterials] Failed to extract {original_filename}: {e}", exc_info=True) def _is_vlm_needed(pdf_path: str) -> bool: try: import fitz doc = fitz.open(pdf_path) if len(doc) == 0: return False check_pages = min(2, len(doc)) total_text_len = 0 vlm_vote = 0 for i in range(check_pages): page = doc[i] rect = page.rect width, height = rect.width, rect.height if width > height and (width / height) > 1.2: vlm_vote += 1 text = page.get_text() total_text_len += len(text.strip()) doc.close() # If any page is landscape => PPT => VLM if vlm_vote > 0: return True # If extremely low text density => Scanned => VLM if check_pages > 0 and (total_text_len / check_pages) < 100: return True return False except Exception as e: logger.error(f"Sniffer error: {e}") return False # Fallback to pymupdf text def _extract_pdf_vlm(pdf_path: str, md_path: str, bucket, file_hash: str, original_filename: str): import fitz import tempfile from openai import OpenAI # 统一收拢于服务端的 LiteLLM 代理管线(彻底废弃单独的 DASHSCOPE 授权) litellm_key = os.getenv("GEMINI_API_KEY") litellm_base = os.getenv("GEMINI_BASE_URL", "http://127.0.0.1:4000/v1") vlm_model = os.getenv("DEEPVIEW_MODEL", "gemini3.1pro-vertex") with tempfile.TemporaryDirectory() as tmp_dir: doc = fitz.open(pdf_path) zoom = 200 / 72 mat = fitz.Matrix(zoom, zoom) urls = [] for i, page in enumerate(doc): pix = page.get_pixmap(matrix=mat) img_path = os.path.join(tmp_dir, f"page_{i + 1:03d}.png") pix.save(img_path) pix = None oss_key = f"deepview-assets/{file_hash}/slides/page_{i+1:03d}.png" bucket.put_object_from_file(oss_key, img_path) url = bucket.sign_url('GET', oss_key, 3600*24) urls.append(url) doc.close() if not litellm_key or not litellm_base: with open(md_path, "w", encoding="utf-8") as f: f.write(f"# Document: VLM Extraction Skipped\n\n缺少LiteLLM网关配置 (GEMINI_API_KEY/GEMINI_BASE_URL)。已将 {len(urls)} 页图片传至 OSS。") return client = OpenAI(base_url=litellm_base, api_key=litellm_key, timeout=120) markdown_blocks = [] for i, u in enumerate(urls): prompt = "详细分析这张页面图片。如果是PPT请提炼核心观点、标题和要素。如果是扫描件请保留每一段具体文字。用纯粹的Markdown格式输出,不要使用```markdown包裹。" try: resp = client.chat.completions.create( model=vlm_model, messages=[{ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": u}}, {"type": "text", "text": prompt} ] }], max_tokens=2048, temperature=0.1 ) txt = resp.choices[0].message.content.strip() if txt.startswith("```markdown"): txt = txt[11:] txt = txt.strip("`\n ") markdown_blocks.append(f"## 第 {i+1} 页\n\n{txt}\n") except Exception as e: logger.error(f"LiteLLM VLM error page {i+1}: {e}") markdown_blocks.append(f"## 第 {i+1} 页\n*(视觉提取超时或失败)*\n") with open(md_path, "w", encoding="utf-8") as f: f.write(f"# Document: {original_filename}\n\n") f.write("\n---\n".join(markdown_blocks)) def _extract_pdf_to_md(pdf_path: str, md_path: str, original_filename: str): try: import pymupdf4llm md_text = pymupdf4llm.to_markdown(pdf_path) with open(md_path, "w", encoding="utf-8") as f: f.write(f"# Document: {original_filename}\n\n") f.write(md_text) except Exception as e: logger.error(f"pymupdf4llm error: {e}") _simulate_extract(md_path, os.path.basename(pdf_path)) def _extract_docx(docx_path: str, md_path: str, original_filename: str): try: import docx doc = docx.Document(docx_path) text = "\n\n".join([p.text for p in doc.paragraphs if p.text.strip()]) with open(md_path, "w", encoding="utf-8") as f: f.write(f"# Document: {original_filename}\n\n") f.write(text) except Exception as e: logger.error(f"docx error: {e}") _simulate_extract(md_path, os.path.basename(docx_path)) def _extract_txt(txt_path: str, md_path: str, original_filename: str): try: with open(txt_path, "r", encoding="utf-8") as f: text = f.read() with open(md_path, "w", encoding="utf-8") as f: f.write(f"# Document: {original_filename}\n\n") f.write(text) except Exception as e: logger.error(f"txt error: {e}") _simulate_extract(md_path, os.path.basename(txt_path)) def _extract_audio_asr(audio_path: str, md_path: str, original_filename: str, bucket, oss_key: str): import requests import time try: dashscope_key = os.getenv("DASHSCOPE_API_KEY", "") if not dashscope_key: logger.error("[DeepviewMaterials] Missing DASHSCOPE_API_KEY for ASR.") _simulate_extract(md_path, f"{original_filename} (ASR Failed: No DASHSCOPE_API_KEY)") return logger.info(f"[DeepviewMaterials] Submitting Long Audio ASR Task for {original_filename}") # 1. 签名前端传上来的 OSS URL (给阿里大模型长音频异步读取用) audio_url = bucket.sign_url('GET', oss_key, 3600 * 24) # 2. 提交异步听写任务 headers = { "Authorization": f"Bearer {dashscope_key}", "Content-Type": "application/json", "X-DashScope-Async": "enable" } payload = { "model": "paraformer-v2", # qwen-audio 长语音引擎(唯一支持说话人分离的离线异步服务) "input": {"file_urls": [audio_url]}, "parameters": { "diarization_enabled": True # 开启说话人角色分离 (音色解析) } } resp = requests.post("https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription", json=payload, headers=headers) if resp.status_code != 200: logger.error(f"[DeepviewMaterials] ASR Submit Failed: {resp.text}") _simulate_extract(md_path, f"{original_filename} (ASR Submit Failed)") return task_id = resp.json()["output"]["task_id"] logger.info(f"[DeepviewMaterials] ASR Task Submitted: {task_id}. Polling...") # 3. 轮询结果 polling_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}" while True: status_resp = requests.get(polling_url, headers=headers) if status_resp.status_code != 200: logger.error(f"[DeepviewMaterials] ASR Polling HTTP Error: {status_resp.text}") break data = status_resp.json() status = data["output"]["task_status"] if status == "SUCCEEDED": result_url = data["output"]["results"][0]["transcription_url"] result_resp = requests.get(result_url) transcripts = result_resp.json().get("transcripts", []) if not transcripts: with open(md_path, "w", encoding="utf-8") as f: f.write(f"# 🎵 面诊录音: {original_filename}\n\n*(未能提取出任何语音)*") return sentences = transcripts[0].get("sentences", []) # 4. 格式化落盘:音色换行隔离 with open(md_path, "w", encoding="utf-8") as f: f.write(f"# 🎵 面诊录音: {original_filename}\n\n") last_speaker = None for s in sentences: spk = s.get("speaker_id", "Unknown") # Paraformer 通常返回 spk_0, spk_1 或者根据音色聚类 spk_label = f"**说话人 {spk}**" if spk != "Unknown" else "**未知说话人**" text = s.get("text", "") if spk != last_speaker: f.write(f"\n\n{spk_label}: {text}") last_speaker = spk else: f.write(f" {text}") logger.info(f"[DeepviewMaterials] ASR Diarization successfully completed for {original_filename}") return elif status == "FAILED": logger.error(f"[DeepviewMaterials] ASR Task Failed Internally: {data}") break time.sleep(3) # 轮询间隔 # 兜底 _simulate_extract(md_path, f"{original_filename} (ASR Polling Failed or Timeout)") except Exception as e: logger.error(f"[DeepviewMaterials] audio error: {e}", exc_info=True) _simulate_extract(md_path, os.path.basename(audio_path)) def _simulate_extract(md_path: str, original_filename: str): with open(md_path, "w", encoding="utf-8") as f: f.write(f"# Document: {original_filename}\n\nNotice: Extracted via simple fallback parser.")