doctorAI/backend/gateway/platforms/deepview_materials.py

404 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import hashlib
import os
import time
import logging
import json
import re
from pathlib import Path
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
async def process_uploaded_material_oss(
oss_key: str,
original_filename: str,
context_id: str,
push_event_fn,
user_id: str,
org_id: str = "org_001"
):
"""
1. Downloads native PDF from OSS
2. Runs deduplication
3. Sniffs doc to decide between VLM or pure text
4. Executes the relevant pipeline to produce .md
"""
from hermes_constants import get_hermes_home
# 1. Fetch from OSS
oss_ak = os.getenv("ALIYUN_ACCESS_KEY_ID", "")
oss_sk = os.getenv("ALIYUN_ACCESS_KEY_SECRET", "")
oss_bucket = os.getenv("OSS_BUCKET", "meetings-dev")
oss_endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com")
if not all([oss_ak, oss_sk]):
logger.error("[DeepviewMaterials] Missing OSS keys in backend .env. Skipping ingestion.")
return
try:
import oss2
auth = oss2.Auth(oss_ak, oss_sk)
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
logger.info(f"[DeepviewMaterials] Over internal network fetching {oss_key}...")
result = bucket.get_object(oss_key)
file_bytes = result.read()
except Exception as e:
logger.error(f"[DeepviewMaterials] OSS Download failed: {e}")
return
# 2. Hashing
file_hash = hashlib.md5(file_bytes).hexdigest()
# 3. 解析 Context 路由(深维专属双上下文 FS-as-Database 管线)
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
storage_root = Path(storageDir)
user_id_safe = user_id if user_id else "unknown"
org_id_safe = org_id if org_id else "org_001"
user_dir = storage_root / "users" / user_id_safe
org_dir = storage_root / "orgs" / org_id_safe
platform_dir = storage_root / "platform"
ext = os.path.splitext(original_filename)[1].lower()
# 过滤恶意文件
if ext == '.url' or ext == '.exe' or ext == '.sh':
logger.warning(f"Rejected unsafe file: {original_filename}")
return
# 定义路由目标
if context_id.startswith("recording:"):
recordingId = context_id.split(":", 1)[1]
parts = recordingId.split("/")
if len(parts) > 1:
# 旧格式已归档recording:clientId/asrId
clientId = parts[0]
asrId = parts[-1]
base_dir = user_dir / "clients" / clientId / "history"
raw_path = base_dir / f"{asrId}_raw{ext}"
md_path = base_dir / f"{asrId}.md"
else:
# 新格式Inboxrecording:asrId
asrId = parts[0]
base_dir = user_dir / "inbox" / asrId
raw_path = base_dir / f"asr_raw{ext}"
md_path = base_dir / "asr.md"
elif context_id.startswith("client:"):
clientId = context_id.split(":", 1)[1]
base_dir = user_dir / "clients" / clientId
# 客户全景上下文:碎片化资料存档(可能是合同、病历单据等)
raw_path = base_dir / f"{file_hash}_raw{ext}"
md_path = base_dir / f"{file_hash}.md"
elif context_id.startswith("wiki:") or context_id == "deepview":
# 存入本机构域的 wiki 库
base_dir = org_dir / "wiki"
raw_path = base_dir / f"{file_hash}_raw{ext}"
md_path = base_dir / f"{file_hash}.md"
elif context_id.startswith("platform:"):
# 预留平台运维口
base_dir = platform_dir / "wiki"
raw_path = base_dir / f"{file_hash}_raw{ext}"
md_path = base_dir / f"{file_hash}.md"
elif context_id.startswith("doctor:"):
base_dir = user_dir
raw_path = base_dir / f"doctor_profile_raw{ext}"
md_path = base_dir / "doctor_profile.md"
else:
# Fallback 容错隔离区
base_dir = user_dir / "misc" / context_id.replace(":", "_")
raw_path = base_dir / f"{file_hash}_raw{ext}"
md_path = base_dir / f"{file_hash}.md"
base_dir.mkdir(parents=True, exist_ok=True)
# 幂等性检查:文件是否已经存在
if md_path.exists():
logger.info(f"[DeepviewMaterials] File {original_filename} already processed for {context_id}.")
push_event_fn(user_id, "material:done", {
"projectId": context_id,
"filename": original_filename,
"fileId": file_hash
})
return
# 存储原始二进制文件(用于溯源和重试)
with open(raw_path, "wb") as f:
f.write(file_bytes)
# Process
loop = asyncio.get_event_loop()
try:
if ext == ".pdf":
# Sniffer
if _is_vlm_needed(str(raw_path)):
logger.info("[DeepviewMaterials] Sniffer: Routing to OSS+VLM Pipeline.")
await loop.run_in_executor(None, _extract_pdf_vlm, str(raw_path), str(md_path), bucket, file_hash, original_filename)
else:
logger.info("[DeepviewMaterials] Sniffer: Routing to Pymupdf4llm text Pipeline.")
await loop.run_in_executor(None, _extract_pdf_to_md, str(raw_path), str(md_path), original_filename)
elif ext in [".docx", ".doc"]:
await loop.run_in_executor(None, _extract_docx, str(raw_path), str(md_path), original_filename)
elif ext in [".txt", ".md", ".csv"]:
await loop.run_in_executor(None, _extract_txt, str(raw_path), str(md_path), original_filename)
elif ext in [".m4a", ".mp3", ".wav", ".webm", ".ogg"]:
await loop.run_in_executor(None, _extract_audio_asr, str(raw_path), str(md_path), original_filename, bucket, oss_key)
else:
await loop.run_in_executor(None, _simulate_extract, str(md_path), original_filename)
# ★ 持久化到 deepview_materials 表DB 唯一真相)
try:
from hermes_state import SessionDB
db = SessionDB()
def _do(conn):
conn.execute(
"INSERT OR IGNORE INTO deepview_materials (id, filename, context_id, user_id, source, created_at) "
"VALUES (?, ?, ?, ?, 'upload', ?)",
(file_hash, original_filename, context_id, user_id, time.time())
)
db._execute_write(_do)
except Exception as e:
logger.warning(f"[DeepviewMaterials] Failed to persist to DB: {e}")
# Emit SSE success
logger.info(f"[DeepviewMaterials] Successfully ingested {original_filename}")
push_event_fn(user_id, "material:done", {
"projectId": context_id,
"filename": original_filename,
"fileId": file_hash
})
except Exception as e:
logger.error(f"[DeepviewMaterials] Failed to extract {original_filename}: {e}", exc_info=True)
def _is_vlm_needed(pdf_path: str) -> bool:
try:
import fitz
doc = fitz.open(pdf_path)
if len(doc) == 0: return False
check_pages = min(2, len(doc))
total_text_len = 0
vlm_vote = 0
for i in range(check_pages):
page = doc[i]
rect = page.rect
width, height = rect.width, rect.height
if width > height and (width / height) > 1.2:
vlm_vote += 1
text = page.get_text()
total_text_len += len(text.strip())
doc.close()
# If any page is landscape => PPT => VLM
if vlm_vote > 0:
return True
# If extremely low text density => Scanned => VLM
if check_pages > 0 and (total_text_len / check_pages) < 100:
return True
return False
except Exception as e:
logger.error(f"Sniffer error: {e}")
return False # Fallback to pymupdf text
def _extract_pdf_vlm(pdf_path: str, md_path: str, bucket, file_hash: str, original_filename: str):
import fitz
import tempfile
from openai import OpenAI
# 统一收拢于服务端的 LiteLLM 代理管线(彻底废弃单独的 DASHSCOPE 授权)
litellm_key = os.getenv("GEMINI_API_KEY")
litellm_base = os.getenv("GEMINI_BASE_URL", "http://127.0.0.1:4000/v1")
vlm_model = os.getenv("DEEPVIEW_MODEL", "gemini3.1pro-vertex")
with tempfile.TemporaryDirectory() as tmp_dir:
doc = fitz.open(pdf_path)
zoom = 200 / 72
mat = fitz.Matrix(zoom, zoom)
urls = []
for i, page in enumerate(doc):
pix = page.get_pixmap(matrix=mat)
img_path = os.path.join(tmp_dir, f"page_{i + 1:03d}.png")
pix.save(img_path)
pix = None
oss_key = f"deepview-assets/{file_hash}/slides/page_{i+1:03d}.png"
bucket.put_object_from_file(oss_key, img_path)
url = bucket.sign_url('GET', oss_key, 3600*24)
urls.append(url)
doc.close()
if not litellm_key or not litellm_base:
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# Document: VLM Extraction Skipped\n\n缺少LiteLLM网关配置 (GEMINI_API_KEY/GEMINI_BASE_URL)。已将 {len(urls)} 页图片传至 OSS。")
return
client = OpenAI(base_url=litellm_base, api_key=litellm_key, timeout=120)
markdown_blocks = []
for i, u in enumerate(urls):
prompt = "详细分析这张页面图片。如果是PPT请提炼核心观点、标题和要素。如果是扫描件请保留每一段具体文字。用纯粹的Markdown格式输出不要使用```markdown包裹。"
try:
resp = client.chat.completions.create(
model=vlm_model,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": u}},
{"type": "text", "text": prompt}
]
}],
max_tokens=2048,
temperature=0.1
)
txt = resp.choices[0].message.content.strip()
if txt.startswith("```markdown"):
txt = txt[11:]
txt = txt.strip("`\n ")
markdown_blocks.append(f"## 第 {i+1}\n\n{txt}\n")
except Exception as e:
logger.error(f"LiteLLM VLM error page {i+1}: {e}")
markdown_blocks.append(f"## 第 {i+1}\n*(视觉提取超时或失败)*\n")
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# Document: {original_filename}\n\n")
f.write("\n---\n".join(markdown_blocks))
def _extract_pdf_to_md(pdf_path: str, md_path: str, original_filename: str):
try:
import pymupdf4llm
md_text = pymupdf4llm.to_markdown(pdf_path)
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# Document: {original_filename}\n\n")
f.write(md_text)
except Exception as e:
logger.error(f"pymupdf4llm error: {e}")
_simulate_extract(md_path, os.path.basename(pdf_path))
def _extract_docx(docx_path: str, md_path: str, original_filename: str):
try:
import docx
doc = docx.Document(docx_path)
text = "\n\n".join([p.text for p in doc.paragraphs if p.text.strip()])
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# Document: {original_filename}\n\n")
f.write(text)
except Exception as e:
logger.error(f"docx error: {e}")
_simulate_extract(md_path, os.path.basename(docx_path))
def _extract_txt(txt_path: str, md_path: str, original_filename: str):
try:
with open(txt_path, "r", encoding="utf-8") as f:
text = f.read()
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# Document: {original_filename}\n\n")
f.write(text)
except Exception as e:
logger.error(f"txt error: {e}")
_simulate_extract(md_path, os.path.basename(txt_path))
def _extract_audio_asr(audio_path: str, md_path: str, original_filename: str, bucket, oss_key: str):
import requests
import time
try:
dashscope_key = os.getenv("DASHSCOPE_API_KEY", "")
if not dashscope_key:
logger.error("[DeepviewMaterials] Missing DASHSCOPE_API_KEY for ASR.")
_simulate_extract(md_path, f"{original_filename} (ASR Failed: No DASHSCOPE_API_KEY)")
return
logger.info(f"[DeepviewMaterials] Submitting Long Audio ASR Task for {original_filename}")
# 1. 签名前端传上来的 OSS URL (给阿里大模型长音频异步读取用)
audio_url = bucket.sign_url('GET', oss_key, 3600 * 24)
# 2. 提交异步听写任务
headers = {
"Authorization": f"Bearer {dashscope_key}",
"Content-Type": "application/json",
"X-DashScope-Async": "enable"
}
payload = {
"model": "paraformer-v2", # qwen-audio 长语音引擎(唯一支持说话人分离的离线异步服务)
"input": {"file_urls": [audio_url]},
"parameters": {
"diarization_enabled": True # 开启说话人角色分离 (音色解析)
}
}
resp = requests.post("https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription", json=payload, headers=headers)
if resp.status_code != 200:
logger.error(f"[DeepviewMaterials] ASR Submit Failed: {resp.text}")
_simulate_extract(md_path, f"{original_filename} (ASR Submit Failed)")
return
task_id = resp.json()["output"]["task_id"]
logger.info(f"[DeepviewMaterials] ASR Task Submitted: {task_id}. Polling...")
# 3. 轮询结果
polling_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
while True:
status_resp = requests.get(polling_url, headers=headers)
if status_resp.status_code != 200:
logger.error(f"[DeepviewMaterials] ASR Polling HTTP Error: {status_resp.text}")
break
data = status_resp.json()
status = data["output"]["task_status"]
if status == "SUCCEEDED":
result_url = data["output"]["results"][0]["transcription_url"]
result_resp = requests.get(result_url)
transcripts = result_resp.json().get("transcripts", [])
if not transcripts:
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# 🎵 面诊录音: {original_filename}\n\n*(未能提取出任何语音)*")
return
sentences = transcripts[0].get("sentences", [])
# 4. 格式化落盘:音色换行隔离
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# 🎵 面诊录音: {original_filename}\n\n")
last_speaker = None
for s in sentences:
spk = s.get("speaker_id", "Unknown")
# Paraformer 通常返回 spk_0, spk_1 或者根据音色聚类
spk_label = f"**说话人 {spk}**" if spk != "Unknown" else "**未知说话人**"
text = s.get("text", "")
if spk != last_speaker:
f.write(f"\n\n{spk_label}: {text}")
last_speaker = spk
else:
f.write(f" {text}")
logger.info(f"[DeepviewMaterials] ASR Diarization successfully completed for {original_filename}")
return
elif status == "FAILED":
logger.error(f"[DeepviewMaterials] ASR Task Failed Internally: {data}")
break
time.sleep(3) # 轮询间隔
# 兜底
_simulate_extract(md_path, f"{original_filename} (ASR Polling Failed or Timeout)")
except Exception as e:
logger.error(f"[DeepviewMaterials] audio error: {e}", exc_info=True)
_simulate_extract(md_path, os.path.basename(audio_path))
def _simulate_extract(md_path: str, original_filename: str):
with open(md_path, "w", encoding="utf-8") as f:
f.write(f"# Document: {original_filename}\n\nNotice: Extracted via simple fallback parser.")