404 lines
17 KiB
Python
404 lines
17 KiB
Python
import asyncio
|
||
import hashlib
|
||
import os
|
||
import time
|
||
import logging
|
||
import json
|
||
import re
|
||
from pathlib import Path
|
||
from hermes_constants import get_hermes_home
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
async def process_uploaded_material_oss(
|
||
oss_key: str,
|
||
original_filename: str,
|
||
context_id: str,
|
||
push_event_fn,
|
||
user_id: str,
|
||
org_id: str = "org_001"
|
||
):
|
||
"""
|
||
1. Downloads native PDF from OSS
|
||
2. Runs deduplication
|
||
3. Sniffs doc to decide between VLM or pure text
|
||
4. Executes the relevant pipeline to produce .md
|
||
"""
|
||
from hermes_constants import get_hermes_home
|
||
# 1. Fetch from OSS
|
||
oss_ak = os.getenv("ALIYUN_ACCESS_KEY_ID", "")
|
||
oss_sk = os.getenv("ALIYUN_ACCESS_KEY_SECRET", "")
|
||
oss_bucket = os.getenv("OSS_BUCKET", "meetings-dev")
|
||
oss_endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com")
|
||
|
||
if not all([oss_ak, oss_sk]):
|
||
logger.error("[DeepviewMaterials] Missing OSS keys in backend .env. Skipping ingestion.")
|
||
return
|
||
|
||
try:
|
||
import oss2
|
||
auth = oss2.Auth(oss_ak, oss_sk)
|
||
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
|
||
|
||
logger.info(f"[DeepviewMaterials] Over internal network fetching {oss_key}...")
|
||
result = bucket.get_object(oss_key)
|
||
file_bytes = result.read()
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewMaterials] OSS Download failed: {e}")
|
||
return
|
||
|
||
# 2. Hashing
|
||
file_hash = hashlib.md5(file_bytes).hexdigest()
|
||
|
||
# 3. 解析 Context 路由(深维专属双上下文 FS-as-Database 管线)
|
||
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
|
||
storage_root = Path(storageDir)
|
||
user_id_safe = user_id if user_id else "unknown"
|
||
org_id_safe = org_id if org_id else "org_001"
|
||
|
||
user_dir = storage_root / "users" / user_id_safe
|
||
org_dir = storage_root / "orgs" / org_id_safe
|
||
platform_dir = storage_root / "platform"
|
||
|
||
ext = os.path.splitext(original_filename)[1].lower()
|
||
|
||
# 过滤恶意文件
|
||
if ext == '.url' or ext == '.exe' or ext == '.sh':
|
||
logger.warning(f"Rejected unsafe file: {original_filename}")
|
||
return
|
||
|
||
# 定义路由目标
|
||
if context_id.startswith("recording:"):
|
||
recordingId = context_id.split(":", 1)[1]
|
||
parts = recordingId.split("/")
|
||
if len(parts) > 1:
|
||
# 旧格式(已归档):recording:clientId/asrId
|
||
clientId = parts[0]
|
||
asrId = parts[-1]
|
||
base_dir = user_dir / "clients" / clientId / "history"
|
||
raw_path = base_dir / f"{asrId}_raw{ext}"
|
||
md_path = base_dir / f"{asrId}.md"
|
||
else:
|
||
# 新格式(Inbox):recording:asrId
|
||
asrId = parts[0]
|
||
base_dir = user_dir / "inbox" / asrId
|
||
raw_path = base_dir / f"asr_raw{ext}"
|
||
md_path = base_dir / "asr.md"
|
||
elif context_id.startswith("client:"):
|
||
clientId = context_id.split(":", 1)[1]
|
||
base_dir = user_dir / "clients" / clientId
|
||
# 客户全景上下文:碎片化资料存档(可能是合同、病历单据等)
|
||
raw_path = base_dir / f"{file_hash}_raw{ext}"
|
||
md_path = base_dir / f"{file_hash}.md"
|
||
elif context_id.startswith("wiki:") or context_id == "deepview":
|
||
# 存入本机构域的 wiki 库
|
||
base_dir = org_dir / "wiki"
|
||
raw_path = base_dir / f"{file_hash}_raw{ext}"
|
||
md_path = base_dir / f"{file_hash}.md"
|
||
elif context_id.startswith("platform:"):
|
||
# 预留平台运维口
|
||
base_dir = platform_dir / "wiki"
|
||
raw_path = base_dir / f"{file_hash}_raw{ext}"
|
||
md_path = base_dir / f"{file_hash}.md"
|
||
elif context_id.startswith("doctor:"):
|
||
base_dir = user_dir
|
||
raw_path = base_dir / f"doctor_profile_raw{ext}"
|
||
md_path = base_dir / "doctor_profile.md"
|
||
else:
|
||
# Fallback 容错隔离区
|
||
base_dir = user_dir / "misc" / context_id.replace(":", "_")
|
||
raw_path = base_dir / f"{file_hash}_raw{ext}"
|
||
md_path = base_dir / f"{file_hash}.md"
|
||
|
||
base_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 幂等性检查:文件是否已经存在
|
||
if md_path.exists():
|
||
logger.info(f"[DeepviewMaterials] File {original_filename} already processed for {context_id}.")
|
||
push_event_fn(user_id, "material:done", {
|
||
"projectId": context_id,
|
||
"filename": original_filename,
|
||
"fileId": file_hash
|
||
})
|
||
return
|
||
|
||
# 存储原始二进制文件(用于溯源和重试)
|
||
with open(raw_path, "wb") as f:
|
||
f.write(file_bytes)
|
||
|
||
# Process
|
||
loop = asyncio.get_event_loop()
|
||
try:
|
||
if ext == ".pdf":
|
||
# Sniffer
|
||
if _is_vlm_needed(str(raw_path)):
|
||
logger.info("[DeepviewMaterials] Sniffer: Routing to OSS+VLM Pipeline.")
|
||
await loop.run_in_executor(None, _extract_pdf_vlm, str(raw_path), str(md_path), bucket, file_hash, original_filename)
|
||
else:
|
||
logger.info("[DeepviewMaterials] Sniffer: Routing to Pymupdf4llm text Pipeline.")
|
||
await loop.run_in_executor(None, _extract_pdf_to_md, str(raw_path), str(md_path), original_filename)
|
||
elif ext in [".docx", ".doc"]:
|
||
await loop.run_in_executor(None, _extract_docx, str(raw_path), str(md_path), original_filename)
|
||
elif ext in [".txt", ".md", ".csv"]:
|
||
await loop.run_in_executor(None, _extract_txt, str(raw_path), str(md_path), original_filename)
|
||
elif ext in [".m4a", ".mp3", ".wav", ".webm", ".ogg"]:
|
||
await loop.run_in_executor(None, _extract_audio_asr, str(raw_path), str(md_path), original_filename, bucket, oss_key)
|
||
else:
|
||
await loop.run_in_executor(None, _simulate_extract, str(md_path), original_filename)
|
||
|
||
# ★ 持久化到 deepview_materials 表(DB 唯一真相)
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
def _do(conn):
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO deepview_materials (id, filename, context_id, user_id, source, created_at) "
|
||
"VALUES (?, ?, ?, ?, 'upload', ?)",
|
||
(file_hash, original_filename, context_id, user_id, time.time())
|
||
)
|
||
db._execute_write(_do)
|
||
except Exception as e:
|
||
logger.warning(f"[DeepviewMaterials] Failed to persist to DB: {e}")
|
||
|
||
# Emit SSE success
|
||
logger.info(f"[DeepviewMaterials] Successfully ingested {original_filename}")
|
||
push_event_fn(user_id, "material:done", {
|
||
"projectId": context_id,
|
||
"filename": original_filename,
|
||
"fileId": file_hash
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewMaterials] Failed to extract {original_filename}: {e}", exc_info=True)
|
||
|
||
|
||
def _is_vlm_needed(pdf_path: str) -> bool:
|
||
try:
|
||
import fitz
|
||
doc = fitz.open(pdf_path)
|
||
if len(doc) == 0: return False
|
||
|
||
check_pages = min(2, len(doc))
|
||
total_text_len = 0
|
||
vlm_vote = 0
|
||
|
||
for i in range(check_pages):
|
||
page = doc[i]
|
||
rect = page.rect
|
||
width, height = rect.width, rect.height
|
||
if width > height and (width / height) > 1.2:
|
||
vlm_vote += 1
|
||
text = page.get_text()
|
||
total_text_len += len(text.strip())
|
||
|
||
doc.close()
|
||
|
||
# If any page is landscape => PPT => VLM
|
||
if vlm_vote > 0:
|
||
return True
|
||
|
||
# If extremely low text density => Scanned => VLM
|
||
if check_pages > 0 and (total_text_len / check_pages) < 100:
|
||
return True
|
||
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"Sniffer error: {e}")
|
||
return False # Fallback to pymupdf text
|
||
|
||
def _extract_pdf_vlm(pdf_path: str, md_path: str, bucket, file_hash: str, original_filename: str):
|
||
import fitz
|
||
import tempfile
|
||
from openai import OpenAI
|
||
|
||
# 统一收拢于服务端的 LiteLLM 代理管线(彻底废弃单独的 DASHSCOPE 授权)
|
||
litellm_key = os.getenv("GEMINI_API_KEY")
|
||
litellm_base = os.getenv("GEMINI_BASE_URL", "http://127.0.0.1:4000/v1")
|
||
vlm_model = os.getenv("DEEPVIEW_MODEL", "gemini3.1pro-vertex")
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
doc = fitz.open(pdf_path)
|
||
zoom = 200 / 72
|
||
mat = fitz.Matrix(zoom, zoom)
|
||
|
||
urls = []
|
||
for i, page in enumerate(doc):
|
||
pix = page.get_pixmap(matrix=mat)
|
||
img_path = os.path.join(tmp_dir, f"page_{i + 1:03d}.png")
|
||
pix.save(img_path)
|
||
pix = None
|
||
|
||
oss_key = f"deepview-assets/{file_hash}/slides/page_{i+1:03d}.png"
|
||
bucket.put_object_from_file(oss_key, img_path)
|
||
url = bucket.sign_url('GET', oss_key, 3600*24)
|
||
urls.append(url)
|
||
|
||
doc.close()
|
||
|
||
if not litellm_key or not litellm_base:
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# Document: VLM Extraction Skipped\n\n缺少LiteLLM网关配置 (GEMINI_API_KEY/GEMINI_BASE_URL)。已将 {len(urls)} 页图片传至 OSS。")
|
||
return
|
||
|
||
client = OpenAI(base_url=litellm_base, api_key=litellm_key, timeout=120)
|
||
|
||
markdown_blocks = []
|
||
for i, u in enumerate(urls):
|
||
prompt = "详细分析这张页面图片。如果是PPT请提炼核心观点、标题和要素。如果是扫描件请保留每一段具体文字。用纯粹的Markdown格式输出,不要使用```markdown包裹。"
|
||
try:
|
||
resp = client.chat.completions.create(
|
||
model=vlm_model,
|
||
messages=[{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "image_url", "image_url": {"url": u}},
|
||
{"type": "text", "text": prompt}
|
||
]
|
||
}],
|
||
max_tokens=2048,
|
||
temperature=0.1
|
||
)
|
||
txt = resp.choices[0].message.content.strip()
|
||
if txt.startswith("```markdown"):
|
||
txt = txt[11:]
|
||
txt = txt.strip("`\n ")
|
||
markdown_blocks.append(f"## 第 {i+1} 页\n\n{txt}\n")
|
||
except Exception as e:
|
||
logger.error(f"LiteLLM VLM error page {i+1}: {e}")
|
||
markdown_blocks.append(f"## 第 {i+1} 页\n*(视觉提取超时或失败)*\n")
|
||
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# Document: {original_filename}\n\n")
|
||
f.write("\n---\n".join(markdown_blocks))
|
||
|
||
def _extract_pdf_to_md(pdf_path: str, md_path: str, original_filename: str):
|
||
try:
|
||
import pymupdf4llm
|
||
md_text = pymupdf4llm.to_markdown(pdf_path)
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# Document: {original_filename}\n\n")
|
||
f.write(md_text)
|
||
except Exception as e:
|
||
logger.error(f"pymupdf4llm error: {e}")
|
||
_simulate_extract(md_path, os.path.basename(pdf_path))
|
||
|
||
def _extract_docx(docx_path: str, md_path: str, original_filename: str):
|
||
try:
|
||
import docx
|
||
doc = docx.Document(docx_path)
|
||
text = "\n\n".join([p.text for p in doc.paragraphs if p.text.strip()])
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# Document: {original_filename}\n\n")
|
||
f.write(text)
|
||
except Exception as e:
|
||
logger.error(f"docx error: {e}")
|
||
_simulate_extract(md_path, os.path.basename(docx_path))
|
||
|
||
def _extract_txt(txt_path: str, md_path: str, original_filename: str):
|
||
try:
|
||
with open(txt_path, "r", encoding="utf-8") as f:
|
||
text = f.read()
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# Document: {original_filename}\n\n")
|
||
f.write(text)
|
||
except Exception as e:
|
||
logger.error(f"txt error: {e}")
|
||
_simulate_extract(md_path, os.path.basename(txt_path))
|
||
|
||
def _extract_audio_asr(audio_path: str, md_path: str, original_filename: str, bucket, oss_key: str):
|
||
import requests
|
||
import time
|
||
try:
|
||
dashscope_key = os.getenv("DASHSCOPE_API_KEY", "")
|
||
if not dashscope_key:
|
||
logger.error("[DeepviewMaterials] Missing DASHSCOPE_API_KEY for ASR.")
|
||
_simulate_extract(md_path, f"{original_filename} (ASR Failed: No DASHSCOPE_API_KEY)")
|
||
return
|
||
|
||
logger.info(f"[DeepviewMaterials] Submitting Long Audio ASR Task for {original_filename}")
|
||
|
||
# 1. 签名前端传上来的 OSS URL (给阿里大模型长音频异步读取用)
|
||
audio_url = bucket.sign_url('GET', oss_key, 3600 * 24)
|
||
|
||
# 2. 提交异步听写任务
|
||
headers = {
|
||
"Authorization": f"Bearer {dashscope_key}",
|
||
"Content-Type": "application/json",
|
||
"X-DashScope-Async": "enable"
|
||
}
|
||
payload = {
|
||
"model": "paraformer-v2", # qwen-audio 长语音引擎(唯一支持说话人分离的离线异步服务)
|
||
"input": {"file_urls": [audio_url]},
|
||
"parameters": {
|
||
"diarization_enabled": True # 开启说话人角色分离 (音色解析)
|
||
}
|
||
}
|
||
|
||
resp = requests.post("https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription", json=payload, headers=headers)
|
||
if resp.status_code != 200:
|
||
logger.error(f"[DeepviewMaterials] ASR Submit Failed: {resp.text}")
|
||
_simulate_extract(md_path, f"{original_filename} (ASR Submit Failed)")
|
||
return
|
||
|
||
task_id = resp.json()["output"]["task_id"]
|
||
logger.info(f"[DeepviewMaterials] ASR Task Submitted: {task_id}. Polling...")
|
||
|
||
# 3. 轮询结果
|
||
polling_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
|
||
while True:
|
||
status_resp = requests.get(polling_url, headers=headers)
|
||
if status_resp.status_code != 200:
|
||
logger.error(f"[DeepviewMaterials] ASR Polling HTTP Error: {status_resp.text}")
|
||
break
|
||
|
||
data = status_resp.json()
|
||
status = data["output"]["task_status"]
|
||
|
||
if status == "SUCCEEDED":
|
||
result_url = data["output"]["results"][0]["transcription_url"]
|
||
result_resp = requests.get(result_url)
|
||
transcripts = result_resp.json().get("transcripts", [])
|
||
|
||
if not transcripts:
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# 🎵 面诊录音: {original_filename}\n\n*(未能提取出任何语音)*")
|
||
return
|
||
|
||
sentences = transcripts[0].get("sentences", [])
|
||
|
||
# 4. 格式化落盘:音色换行隔离
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# 🎵 面诊录音: {original_filename}\n\n")
|
||
last_speaker = None
|
||
for s in sentences:
|
||
spk = s.get("speaker_id", "Unknown")
|
||
# Paraformer 通常返回 spk_0, spk_1 或者根据音色聚类
|
||
spk_label = f"**说话人 {spk}**" if spk != "Unknown" else "**未知说话人**"
|
||
text = s.get("text", "")
|
||
|
||
if spk != last_speaker:
|
||
f.write(f"\n\n{spk_label}: {text}")
|
||
last_speaker = spk
|
||
else:
|
||
f.write(f" {text}")
|
||
|
||
logger.info(f"[DeepviewMaterials] ASR Diarization successfully completed for {original_filename}")
|
||
return
|
||
|
||
elif status == "FAILED":
|
||
logger.error(f"[DeepviewMaterials] ASR Task Failed Internally: {data}")
|
||
break
|
||
|
||
time.sleep(3) # 轮询间隔
|
||
|
||
# 兜底
|
||
_simulate_extract(md_path, f"{original_filename} (ASR Polling Failed or Timeout)")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewMaterials] audio error: {e}", exc_info=True)
|
||
_simulate_extract(md_path, os.path.basename(audio_path))
|
||
|
||
def _simulate_extract(md_path: str, original_filename: str):
|
||
with open(md_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# Document: {original_filename}\n\nNotice: Extracted via simple fallback parser.")
|