1759 lines
79 KiB
Python
1759 lines
79 KiB
Python
"""
|
||
深维面诊智能体 SSE 桥接适配器 (deepview_sse.py)
|
||
|
||
独立的 aiohttp HTTP 服务器,专为深维面诊智能体前端提供:
|
||
- GET /deepview/events — 全局 SSE 事件流(JWT 认证)
|
||
- POST /deepview/chat — 发起对话(JWT 认证)
|
||
- GET /deepview/health — 健康检查
|
||
|
||
认证:复用 MindPass 统一认证体系,JWT sub 字段作为 userId。
|
||
隔离:三层模型 — userId (身份) × connId (标签页) × chatId (对话)。
|
||
|
||
翻译规则(基于 Hermes 引擎源码审计):
|
||
- stream_delta_callback(text) → SSE agent:chunk
|
||
- stream_delta_callback(None) → 【丢弃】
|
||
- tool_progress("tool.started") → SSE agent:thinking
|
||
- tool_progress("tool.completed") → 【丢弃】
|
||
- tool_progress("reasoning.available") → 【丢弃】
|
||
- run_conversation() returns → SSE agent:done
|
||
- run_conversation() raises → SSE agent:error
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import time
|
||
import uuid
|
||
from typing import Any, Dict, Optional
|
||
|
||
try:
|
||
from aiohttp import web, ClientSession, ClientTimeout
|
||
AIOHTTP_AVAILABLE = True
|
||
except ImportError:
|
||
AIOHTTP_AVAILABLE = False
|
||
web = None # type: ignore[assignment]
|
||
ClientSession = None # type: ignore[assignment]
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── MindPass 验证委托(路径 B:不持有密钥,调用 MindPass /api/auth/me 验证 Token) ──
|
||
MINDPASS_BASE_URL = os.environ.get("MINDPASS_BASE_URL", "http://127.0.0.1:3020")
|
||
|
||
# 内存缓存:避免同一 token 高频重复请求 MindPass(TTL 60s)
|
||
_tokenCache: Dict[str, tuple] = {} # token -> (userDict, expireTimestamp)
|
||
_TOKEN_CACHE_TTL = 60 # 秒
|
||
|
||
|
||
async def _verifyTokenAsync(token: str) -> Optional[dict]:
|
||
"""
|
||
★ 路径 B:委托 MindPass 验证 Token。
|
||
本服务不持有 JWT 签发密钥,通过 HTTP 调用 MindPass 中央认证服务。
|
||
返回 {userId, phone, name, avatarUrl} 或 None。
|
||
"""
|
||
if not token:
|
||
return None
|
||
|
||
# 开发模式:未配置 MINDPASS_BASE_URL 且不是默认值时,允许 userId 直传
|
||
devMode = os.environ.get("DEEPVIEW_AUTH_DEV_MODE", "")
|
||
if devMode:
|
||
logger.warning("[DeepviewSSE] Auth dev mode enabled, using token as userId")
|
||
return {"userId": token, "phone": "", "name": "开发用户"}
|
||
|
||
# 检查内存缓存
|
||
now = time.time()
|
||
cached = _tokenCache.get(token)
|
||
if cached and cached[1] > now:
|
||
return cached[0]
|
||
|
||
# 调用 MindPass /api/auth/me
|
||
try:
|
||
timeout = ClientTimeout(total=5)
|
||
async with ClientSession(timeout=timeout) as session:
|
||
async with session.get(
|
||
f"{MINDPASS_BASE_URL}/api/auth/me",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
) as resp:
|
||
if resp.status != 200:
|
||
logger.info("[DeepviewSSE] MindPass returned %d for token verification", resp.status)
|
||
return None
|
||
data = await resp.json()
|
||
user = {
|
||
"userId": data["id"],
|
||
"phone": data.get("phone", ""),
|
||
"name": data.get("name", ""),
|
||
"avatarUrl": data.get("avatarUrl", ""),
|
||
}
|
||
# 缓存结果
|
||
_tokenCache[token] = (user, now + _TOKEN_CACHE_TTL)
|
||
return user
|
||
except Exception as e:
|
||
logger.error("[DeepviewSSE] MindPass verification failed: %s", e)
|
||
return None
|
||
|
||
|
||
def _verifyToken(token: str) -> Optional[dict]:
|
||
"""同步兼容包装:在已有 event loop 的上下文中使用 _verifyTokenAsync。"""
|
||
# 被 _extractUser 中的同步路径调用时的兜底
|
||
try:
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
# 在 aiohttp handler 中,应使用 _verifyTokenAsync
|
||
import concurrent.futures
|
||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||
future = pool.submit(asyncio.run, _verifyTokenAsync(token))
|
||
return future.result(timeout=6)
|
||
else:
|
||
return loop.run_until_complete(_verifyTokenAsync(token))
|
||
except Exception as e:
|
||
logger.error("[DeepviewSSE] Sync token verify failed: %s", e)
|
||
return None
|
||
|
||
|
||
# ── 工具名 → 用户可读标签 ──
|
||
_STEP_LABELS = {
|
||
"read_file": "📖 正在查阅知识库",
|
||
"search_files": "🔍 正在搜索相关文件",
|
||
"web_search": "🌐 正在搜索网络",
|
||
"web_extract": "📄 正在提取网页内容",
|
||
"terminal": "⚙️ 正在执行命令",
|
||
"vision_analyze": "👁️ 正在分析图片",
|
||
"patch": "✏️ 正在修改文件",
|
||
"write_file": "💾 正在写入文件",
|
||
}
|
||
|
||
|
||
def _translate_step(toolName: str, preview: str) -> str:
|
||
"""将 Hermes 工具名翻译为前端展示文案。"""
|
||
return _STEP_LABELS.get(toolName, f"🔧 {preview or toolName}")
|
||
|
||
|
||
# ── CORS ──
|
||
_CORS_HEADERS = {
|
||
"Access-Control-Allow-Origin": "*",
|
||
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
||
"Access-Control-Allow-Headers": "Content-Type, Authorization",
|
||
"Access-Control-Max-Age": "600",
|
||
}
|
||
|
||
|
||
class DeepviewSSEServer:
|
||
"""
|
||
深维面诊智能体前端的专属 SSE 服务器。
|
||
|
||
职责:
|
||
1. 管理多顾问、多标签页的 SSE 长连接
|
||
2. 将 Hermes AIAgent 的底层回调翻译为封闭事件集中的 7 个 SSE 事件
|
||
3. 提供业务 API(对话、文件上传、钉入结论)
|
||
|
||
隔离模型:
|
||
- userId (身份层) ← MindPass JWT sub,决定事件推给谁
|
||
- connId (连接层) ← 每个浏览器标签页唯一,同用户多标签页共存
|
||
- chatId (会话层) ← 每个对话 Tab 唯一,前端按 chatId 分桶渲染
|
||
"""
|
||
|
||
def __init__(self, host: str = "127.0.0.1", port: int = 8643):
|
||
self._host = host
|
||
self._port = port
|
||
self._app: Optional["web.Application"] = None
|
||
self._runner: Optional["web.AppRunner"] = None
|
||
# userId → { connId → asyncio.Queue }
|
||
# 同一用户的多个标签页各有自己的 Queue,事件广播到所有连接
|
||
self._sseClients: Dict[str, Dict[str, "asyncio.Queue[Optional[str]]"]] = {}
|
||
# ★ 初始化钉选板存储
|
||
self._initPinsDb()
|
||
# ★ 初始化素材文件表
|
||
self._initMaterialsDb()
|
||
# ★ 初始化报告持久化表
|
||
self._initReportsDb()
|
||
# ★ 初始化扩展档案表
|
||
self._initProfilesDb()
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 存储辅助方法
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _getUserStorageDir(self, userId: str) -> str:
|
||
"""返回当前用户的专属存储沙箱根路径,不存在时自动创建。"""
|
||
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
|
||
userDir = os.path.join(storageDir, "users", userId)
|
||
os.makedirs(userDir, exist_ok=True)
|
||
return userDir
|
||
|
||
def _getOrgStorageDir(self, orgId: str) -> str:
|
||
"""返回机构的存储沙箱根路径,不存在时自动创建。"""
|
||
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
|
||
orgDir = os.path.join(storageDir, "orgs", orgId)
|
||
os.makedirs(orgDir, exist_ok=True)
|
||
return orgDir
|
||
|
||
# ──────────────────────────────────────────────
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _pushEvent(self, userId: str, eventName: str, data: dict) -> None:
|
||
"""向指定用户的所有 SSE 连接(标签页)推送事件。"""
|
||
conns = self._sseClients.get(userId)
|
||
if not conns:
|
||
logger.debug("SSE push to disconnected userId=%s, event=%s dropped", userId, eventName)
|
||
return
|
||
payload = f"event: {eventName}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
||
for connId, q in list(conns.items()):
|
||
try:
|
||
q.put_nowait(payload)
|
||
except asyncio.QueueFull:
|
||
logger.warning("SSE queue full: userId=%s connId=%s, dropping event=%s",
|
||
userId, connId, eventName)
|
||
|
||
def _broadcastEvent(self, eventName: str, data: dict) -> None:
|
||
"""向所有已连接的用户推送事件(用于全局通知如 wiki:updated)。"""
|
||
for userId in list(self._sseClients.keys()):
|
||
self._pushEvent(userId, eventName, data)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Hermes AIAgent 回调工厂
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _makeStreamDeltaCallback(self, userId: str, chatId: str, loop: asyncio.AbstractEventLoop):
|
||
"""
|
||
创建 stream_delta_callback。
|
||
|
||
翻译规则:
|
||
- text (非 None) → agent:chunk
|
||
- None → 丢弃(CLI 显示层哨兵)
|
||
"""
|
||
def callback(delta):
|
||
if delta is None:
|
||
return # 丢弃 CLI 哨兵信号
|
||
loop.call_soon_threadsafe(
|
||
self._pushEvent, userId, "agent:chunk", {
|
||
"chatId": chatId,
|
||
"text": delta,
|
||
}
|
||
)
|
||
return callback
|
||
|
||
def _makeToolProgressCallback(self, userId: str, chatId: str, loop: asyncio.AbstractEventLoop):
|
||
"""
|
||
创建 tool_progress_callback。
|
||
|
||
翻译规则:
|
||
- tool.started → agent:thinking
|
||
- tool.completed → 丢弃(前端不需要)
|
||
- reasoning.available → 丢弃(与 stream_delta 内容重复)
|
||
- _thinking → 丢弃(子代理中继,本项目不涉及)
|
||
"""
|
||
def callback(eventType, name, preview=None, args=None, **kwargs):
|
||
if eventType == "tool.started":
|
||
# 过滤内部事件
|
||
if name and name.startswith("_"):
|
||
return
|
||
message = _translate_step(name, preview)
|
||
loop.call_soon_threadsafe(
|
||
self._pushEvent, userId, "agent:thinking", {
|
||
"chatId": chatId,
|
||
"step": name,
|
||
"message": message,
|
||
}
|
||
)
|
||
# tool.completed, reasoning.available, _thinking → 全部丢弃
|
||
return callback
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 认证辅助
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def _extractUser(self, request: "web.Request") -> Optional[dict]:
|
||
"""
|
||
从请求中提取用户信息(异步版本,委托 MindPass 验证)。
|
||
|
||
优先级:
|
||
1. Authorization: Bearer <token> 头(HTTP POST 请求用)
|
||
2. ?token=<token> 查询参数(EventSource 连接用,因为 EventSource 不支持自定义 Header)
|
||
"""
|
||
# 1. Header
|
||
authHeader = request.headers.get("Authorization", "")
|
||
if authHeader.startswith("Bearer "):
|
||
token = authHeader[7:].strip()
|
||
user = await _verifyTokenAsync(token)
|
||
if user:
|
||
return user
|
||
|
||
# 2. Query parameter
|
||
token = request.query.get("token", "").strip()
|
||
if token:
|
||
return await _verifyTokenAsync(token)
|
||
|
||
return None
|
||
|
||
# ──────────────────────────────────────────────
|
||
# HTTP Handlers
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _initProfilesDb(self) -> None:
|
||
"""确保 deepview_user_profiles 表存在,用于存储额外的用户档案"""
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS deepview_user_profiles (
|
||
user_id TEXT PRIMARY KEY,
|
||
real_name TEXT,
|
||
company TEXT,
|
||
role TEXT,
|
||
voice_url TEXT,
|
||
updated_at REAL NOT NULL
|
||
)
|
||
""")
|
||
# Try to add real_name column if it's missing (for upgrade)
|
||
try:
|
||
db._conn.execute("ALTER TABLE deepview_user_profiles ADD COLUMN real_name TEXT")
|
||
except:
|
||
pass
|
||
db._conn.commit()
|
||
except Exception as e:
|
||
logger.warning("[DeepviewSSE] Failed to init profiles table: %s", e)
|
||
|
||
async def _handleProfileGet(self, request: "web.Request") -> "web.Response":
|
||
"""GET /deepview/profile"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
row = db._conn.execute(
|
||
"SELECT real_name, company, role, voice_url FROM deepview_user_profiles WHERE user_id = ?",
|
||
(user["userId"],)
|
||
).fetchone()
|
||
|
||
if row:
|
||
return web.json_response({
|
||
"realName": row[0],
|
||
"company": row[1],
|
||
"role": row[2],
|
||
"voiceUrl": row[3]
|
||
}, headers=_CORS_HEADERS)
|
||
else:
|
||
return web.json_response({
|
||
"realName": "",
|
||
"company": "",
|
||
"role": "",
|
||
"voiceUrl": ""
|
||
}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to get profile: %s", e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handleProfilePost(self, request: "web.Request") -> "web.Response":
|
||
"""POST /deepview/profile"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
data = await request.json()
|
||
real_name = data.get("realName", "")
|
||
company = data.get("company", "")
|
||
role = data.get("role", "")
|
||
voice_url = data.get("voiceUrl", "")
|
||
|
||
from hermes_state import SessionDB
|
||
import time
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute("""
|
||
INSERT INTO deepview_user_profiles (user_id, real_name, company, role, voice_url, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
ON CONFLICT(user_id) DO UPDATE SET
|
||
real_name=excluded.real_name,
|
||
company=excluded.company,
|
||
role=excluded.role,
|
||
voice_url=excluded.voice_url,
|
||
updated_at=excluded.updated_at
|
||
""", (user["userId"], real_name, company, role, voice_url, time.time()))
|
||
db._conn.commit()
|
||
return web.json_response({"success": True}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to set profile: %s", e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handleHistory(self, request: "web.Request") -> "web.Response":
|
||
"""GET /deepview/chat/history?chatId=xxx"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
chatId = request.query.get("chatId", "").strip()
|
||
if not chatId:
|
||
return web.json_response({"error": "chatId required"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
# SessionDB returns: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
|
||
messages = db.get_messages_as_conversation(chatId)
|
||
return web.json_response({"messages": messages}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to load history for chatId=%s: %s", chatId, e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handleHealth(self, request: "web.Request") -> "web.Response":
|
||
"""GET /deepview/health"""
|
||
clientCount = sum(len(conns) for conns in self._sseClients.values())
|
||
userCount = len(self._sseClients)
|
||
return web.json_response({
|
||
"status": "ok",
|
||
"service": "deepview-sse",
|
||
"users": userCount,
|
||
"connections": clientCount,
|
||
})
|
||
|
||
async def _handleSSEConnect(self, request: "web.Request") -> "web.StreamResponse":
|
||
"""
|
||
GET /deepview/events?token=xxx&connId=yyy
|
||
|
||
建立 SSE 长连接。前端 GlobalSSEService 调用此端点。
|
||
- token: MindPass JWT(通过 query 参数传递,因为 EventSource 不支持自定义 Header)
|
||
- connId: 前端自动生成的连接标识(区分同用户多标签页)
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401)
|
||
|
||
userId = user["userId"]
|
||
connId = request.query.get("connId", "").strip()
|
||
if not connId:
|
||
connId = f"conn_{int(time.time())}_{uuid.uuid4().hex[:6]}"
|
||
|
||
# 在 userId 下注册该 connId 的 Queue(不影响其他标签页)
|
||
if userId not in self._sseClients:
|
||
self._sseClients[userId] = {}
|
||
self._sseClients[userId][connId] = asyncio.Queue(maxsize=500)
|
||
q = self._sseClients[userId][connId]
|
||
|
||
headers = {
|
||
"Content-Type": "text/event-stream",
|
||
"Cache-Control": "no-cache",
|
||
"Connection": "keep-alive",
|
||
**_CORS_HEADERS,
|
||
}
|
||
response = web.StreamResponse(status=200, headers=headers)
|
||
await response.prepare(request)
|
||
|
||
# 发送连接确认
|
||
connectData = {"userId": userId, "connId": connId, "name": user.get("name", "")}
|
||
connectPayload = f"event: connected\ndata: {json.dumps(connectData, ensure_ascii=False)}\n\n"
|
||
await response.write(connectPayload.encode("utf-8"))
|
||
|
||
logger.info("[DeepviewSSE] User %s (%s) connected via connId=%s",
|
||
user.get("name", "?"), userId[:8], connId)
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
payload = await asyncio.wait_for(q.get(), timeout=30.0)
|
||
except asyncio.TimeoutError:
|
||
# 心跳保活
|
||
await response.write(b": heartbeat\n\n")
|
||
continue
|
||
|
||
if payload is None:
|
||
break # 断开信号
|
||
|
||
await response.write(payload.encode("utf-8"))
|
||
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError):
|
||
logger.info("[DeepviewSSE] User %s connId=%s disconnected (connection lost)",
|
||
userId[:8], connId)
|
||
finally:
|
||
# ★ 只清理该 connId,不影响同用户其他标签页
|
||
if userId in self._sseClients:
|
||
self._sseClients[userId].pop(connId, None)
|
||
if not self._sseClients[userId]:
|
||
del self._sseClients[userId]
|
||
logger.info("[DeepviewSSE] connId=%s cleaned up (user %s remaining conns: %d)",
|
||
connId, userId[:8],
|
||
len(self._sseClients.get(userId, {})))
|
||
|
||
return response
|
||
|
||
async def _handleChat(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/chat
|
||
Body: { chatId, text, contextId? }
|
||
Auth: Authorization: Bearer <MindPass JWT>
|
||
|
||
三戒律第二条:HTTP 只确认收到。业务结果通过 SSE 推送。
|
||
注意:userId 从 JWT 中提取,不信任前端传入。
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except Exception:
|
||
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
# ★ userId 从 JWT 中获取,不信任前端传入
|
||
userId = user["userId"]
|
||
chatId = body.get("chatId", "").strip()
|
||
text = body.get("text", "").strip()
|
||
contextId = body.get("contextId", "deepview")
|
||
doctorId = body.get("doctorId", "doc_001")
|
||
|
||
if not chatId or not text:
|
||
return web.json_response({"error": "Missing chatId or text"},
|
||
status=400, headers=_CORS_HEADERS)
|
||
|
||
# 立即返回 {received: true},不等待 AI
|
||
asyncio.create_task(self._runChat(userId, chatId, text, contextId, doctorId))
|
||
|
||
return web.json_response(
|
||
{"received": True},
|
||
headers=_CORS_HEADERS,
|
||
)
|
||
|
||
async def _runChat(self, userId: str, chatId: str, text: str, contextId: str, doctorId: str = "doc_001") -> None:
|
||
"""
|
||
在后台执行 Hermes AIAgent 对话 (双上下文包路由)。
|
||
全部结果通过 SSE 推送,不通过 HTTP 返回。
|
||
"""
|
||
try:
|
||
from run_agent import AIAgent
|
||
from hermes_state import SessionDB
|
||
|
||
db = SessionDB()
|
||
loop = asyncio.get_event_loop()
|
||
|
||
# 从环境变量获取存储根目录,并获取用户专属沙箱与机构知识域
|
||
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
|
||
userDir = self._getUserStorageDir(userId)
|
||
|
||
# 解析 orgId
|
||
orgId = "org_001"
|
||
|
||
orgDir = self._getOrgStorageDir(orgId)
|
||
platformDir = os.path.join(storageDir, "platform")
|
||
|
||
# 1. 加载唯一的 SKILL.md (上下文无关版)
|
||
skillPath = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
||
"skills", "deepview_assistant", "SKILL.md",
|
||
)
|
||
|
||
systemPrompt = "你是深维面诊智能军师。\n\n"
|
||
if os.path.exists(skillPath):
|
||
with open(skillPath, "r", encoding="utf-8") as f:
|
||
systemPrompt = f.read()
|
||
|
||
systemPrompt = systemPrompt.replace("{{STORAGE_DIR}}", storageDir)
|
||
|
||
# 2. 核心架构逻辑:根据 contextId 前缀路由上下文包
|
||
if contextId.startswith("recording:"):
|
||
recordingId = contextId.split(":", 1)[1]
|
||
# 在物理部署中,应该查找 recording 属于哪个 client,这里假设我们能推断或传进来
|
||
# 为了简便演示,假设客户端通过 clientId:recordingId 格式,或者通过 DB 查得
|
||
parts = recordingId.split("/")
|
||
clientId = parts[0] if len(parts) > 1 else "unknown_client"
|
||
asrId = parts[-1]
|
||
asrPath = f"{userDir}/clients/{clientId}/history/{asrId}.md"
|
||
|
||
systemPrompt += f"""
|
||
|
||
## 🎙️ 当前上下文模式:单次面诊录音复盘
|
||
- 录音 ASR 文件:{asrPath}
|
||
- 医生风格档案:{userDir}/doctor_profile.md
|
||
- 企业知识库目录:{orgDir}/wiki/
|
||
- 平台规则参考:{platformDir}/wiki/
|
||
|
||
### 你的工作重心
|
||
基于这一次面诊录音,分析信任断点、沟通体征、改进建议。
|
||
不要读取该客户的完整 profile.md(那是全景档案模式的职责)。
|
||
"""
|
||
elif contextId.startswith("client:"):
|
||
clientId = contextId.split(":", 1)[1]
|
||
systemPrompt += f"""
|
||
|
||
## 👤 当前上下文模式:客户全景档案
|
||
- 客户档案目录:{userDir}/clients/{clientId}/
|
||
- 核心档案:{userDir}/clients/{clientId}/profile.md
|
||
- 历史录音目录:{userDir}/clients/{clientId}/history/
|
||
- 医生风格档案:{userDir}/doctor_profile.md
|
||
- 企业知识库目录:{orgDir}/wiki/
|
||
- 平台规则参考:{platformDir}/wiki/
|
||
|
||
### 你的工作重心
|
||
基于该客户的全生命周期数据,提供诊前策略、社交杠杆分析、跨品类破冰方案。
|
||
优先读取 profile.md 获取全景概览,必要时深入 history/ 追溯原始录音细节。
|
||
"""
|
||
else:
|
||
systemPrompt += f"\n\n## 通用模式\n企业知识库目录:{orgDir}/wiki/\n平台规则参考:{platformDir}/wiki/\n"
|
||
|
||
# 3. 加载上下文履历,实现真正的 Stateful Context
|
||
history = db.get_messages_as_conversation(chatId)
|
||
|
||
agent = AIAgent(
|
||
model=os.getenv("DEEPVIEW_MODEL", "gemini-pro-vertex"),
|
||
enabled_toolsets=["file"], # 严格按 SPEC 仅给文件权限
|
||
stream_delta_callback=self._makeStreamDeltaCallback(userId, chatId, loop),
|
||
tool_progress_callback=self._makeToolProgressCallback(userId, chatId, loop),
|
||
quiet_mode=True,
|
||
platform="deepview",
|
||
session_id=chatId,
|
||
session_db=db,
|
||
user_id=userId,
|
||
)
|
||
|
||
result = await loop.run_in_executor(
|
||
None,
|
||
lambda: agent.run_conversation(
|
||
user_message=text,
|
||
system_message=systemPrompt,
|
||
conversation_history=history,
|
||
),
|
||
)
|
||
|
||
finalResponse = result.get("final_response", "")
|
||
if not finalResponse:
|
||
finalResponse = result.get("error", "(未生成回答)")
|
||
|
||
# agent:done — 标记对话完成
|
||
self._pushEvent(userId, "agent:done", {
|
||
"chatId": chatId,
|
||
"fullAnswer": finalResponse,
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error("[DeepviewSSE] Chat error for %s/%s: %s", userId, chatId, e, exc_info=True)
|
||
self._pushEvent(userId, "agent:error", {
|
||
"chatId": chatId,
|
||
"message": f"处理失败:{str(e)}",
|
||
})
|
||
|
||
async def _handleReportGenerate(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/report/generate
|
||
Body: { contextId, doctorId? }
|
||
同步生成 JSON 报告,包含最多 3 次重试逻辑。
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except Exception:
|
||
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
import uuid
|
||
userId = user["userId"]
|
||
contextId = body.get("contextId", "")
|
||
doctorId = body.get("doctorId", "doc_001")
|
||
presetReportId = body.get("presetReportId")
|
||
|
||
if not contextId:
|
||
return web.json_response({"error": "Missing contextId"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
userObj = await self._extractUser(request)
|
||
userId = userObj.get("sub", "unknown") if userObj else "unknown"
|
||
userDir = self._getUserStorageDir(userId)
|
||
|
||
# 解析 orgId
|
||
orgId = userObj.get("org", "org_001") if userObj else "org_001"
|
||
orgDir = self._getOrgStorageDir(orgId)
|
||
|
||
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
|
||
platformDir = os.path.join(storageDir, "platform")
|
||
|
||
# 构建基础 Prompt
|
||
systemPrompt = "你是深维面诊智能军师。\n"
|
||
skillPath = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
||
"skills", "deepview_assistant", "SKILL.md",
|
||
)
|
||
if os.path.exists(skillPath):
|
||
with open(skillPath, "r", encoding="utf-8") as f:
|
||
systemPrompt = f.read()
|
||
|
||
systemPrompt = systemPrompt.replace("{{STORAGE_DIR}}", storageDir)
|
||
# 兼容一下如果 prompt 里有企业知识库占位
|
||
# (因为 V3 里移除了,如果有人加回来,这里替换)
|
||
systemPrompt = systemPrompt.replace(f"{storageDir}/wiki/", f"{orgDir}/wiki/")
|
||
systemPrompt += f"\n\n## 规则遵循\n企业知识库:{orgDir}/wiki/\n平台规则:{platformDir}/wiki/\n"
|
||
|
||
if contextId.startswith("recording:"):
|
||
recordingId = contextId.split(":", 1)[1]
|
||
# Inbox 模式:ASR 文件在 inbox/{reportId}/ 下
|
||
# 也兼容旧格式 recording:clientId/asrId
|
||
parts = recordingId.split("/")
|
||
if len(parts) > 1:
|
||
# 旧格式(已归档):recording:clientId/asrId
|
||
clientId = parts[0]
|
||
asrId = parts[-1]
|
||
asrPath = f"{userDir}/clients/{clientId}/history/{asrId}.md"
|
||
clientProfilePath = f"{userDir}/clients/{clientId}/profile.md"
|
||
else:
|
||
# 新格式(Inbox):recording:asrId
|
||
asrId = parts[0]
|
||
asrPath = f"{userDir}/inbox/{asrId}/asr.md"
|
||
clientProfilePath = None
|
||
systemPrompt += f"\n## 🎙️ 第1模式:单条面诊录音复盘\n你的唯一任务是提取基于这通录音的战术复盘。\n录音路径:{asrPath}\n医生档案参考:{userDir}/doctor_profile.md"
|
||
if clientProfilePath and os.path.exists(clientProfilePath):
|
||
systemPrompt += f"\n客户档案参考:{clientProfilePath}"
|
||
systemPrompt += f"\n\n🚨 【系统硬约束:Speaker 天然推断指令】"
|
||
systemPrompt += f"\n原生录音 ASR 中仅含有 Speaker_1、Speaker_2 等无感情编号。请根据对话上下文(谁在做专业科普与处方,谁在表述容貌焦虑与顾虑)自然反演出真实的医生和客户对应关系。"
|
||
systemPrompt += f"\n在输出所有报告正文时,请直接使用真实姓名,彻底抹除 Speaker_X 痕迹。"
|
||
else:
|
||
clientId = contextId.split(":", 1)[1] if ":" in contextId else contextId
|
||
systemPrompt += f"\n## 👤 第2模式:客户全景档案战略\n你的唯一任务是生成该客户的全景战略洞察报告。\n档案路径:{userDir}/clients/{clientId}/profile.md\n历史录音目录:{userDir}/clients/{clientId}/history/\n医生风格:{userDir}/doctor_profile.md"
|
||
# ── Stage 1 内容引擎: Hermes AIAgent (md2md) ──
|
||
# 只负责生成富文本分析报告,不管格式
|
||
contentChecklist = """
|
||
|
||
## 输出必须包含的章节(缺一不可)
|
||
1. 方案接纳与信任转化评估(核心结果 + 信任指数百分制 + AI洞察段落)
|
||
2. 面诊体征数据(医患说话时间比 + 降维科普能力评价并引用比喻原句 + 信任温度曲线各阶段描述)
|
||
3. 信任断点溯源(精确到录音时间戳 + 原声对话还原 + 核心病灶诊断标签 + 深度改进分析段落)
|
||
4. D.E.E.P. 雷达评估(D诊断/E1共情/E2科普/P处方 四维1-5分评分及各维度鉴定说明 + 以军师口吻给医生的犀利忠告)
|
||
5. 行动处方(外部行动轨道:含节点名/动作/话术/目的 + 内部觉察轨道:含改变/动作/话术)
|
||
6. 报告编号(DW-AMXG-日期)、客户姓名、面诊医生、核心诉求、最终接纳度、面诊时长秒数
|
||
"""
|
||
systemPrompt += contentChecklist
|
||
|
||
from run_agent import AIAgent
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
loop = asyncio.get_event_loop()
|
||
|
||
try:
|
||
agent = AIAgent(
|
||
model=os.getenv("DEEPVIEW_MODEL", "gemini-pro-vertex"),
|
||
enabled_toolsets=["file"],
|
||
quiet_mode=True,
|
||
platform="deepview",
|
||
session_id=str(uuid.uuid4()),
|
||
session_db=db,
|
||
user_id=userId,
|
||
)
|
||
|
||
result = await loop.run_in_executor(
|
||
None,
|
||
lambda: agent.run_conversation(
|
||
user_message="请根据给定的上下文文件,生成完整的面诊沟通X光片分析报告。按照章节 checklist 确保不遗漏任何模块。",
|
||
system_message=systemPrompt,
|
||
),
|
||
)
|
||
|
||
mdReport = result.get("final_response", "").strip()
|
||
logger.info(f"[DeepviewSSE] Stage 1 (Hermes md2md) done, length={len(mdReport)} chars")
|
||
|
||
if not mdReport or len(mdReport) < 100:
|
||
return web.json_response({"error": "Stage 1 报告内容为空"}, status=500, headers=_CORS_HEADERS)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewSSE] Stage 1 error: {e}")
|
||
return web.json_response({"error": f"Stage 1 failed: {e}"}, status=500, headers=_CORS_HEADERS)
|
||
|
||
# ── Stage 2 格式引擎: qwen3-plus via LiteLLM (JSON Schema 硬约束) ──
|
||
# 纯转换任务,不做内容创造,只做结构对齐
|
||
jsonSchema = {
|
||
"type": "object",
|
||
"properties": {
|
||
"patientName": {"type": "string"},
|
||
"coreDemand": {"type": "string"},
|
||
"acceptance": {"type": "string"},
|
||
"duration": {"type": "number"},
|
||
"xray": {
|
||
"type": "object",
|
||
"properties": {
|
||
"module1": {
|
||
"type": "object",
|
||
"properties": {
|
||
"result": {"type": "string"},
|
||
"trustIndex": {"type": "number"},
|
||
"coreInsight": {"type": "string"}
|
||
},
|
||
"required": ["result", "trustIndex", "coreInsight"]
|
||
},
|
||
"module2": {
|
||
"type": "object",
|
||
"properties": {
|
||
"ratio": {"type": "string"},
|
||
"metaphorScore": {"type": "string"},
|
||
"curveDesc": {"type": "array", "items": {"type": "string"}}
|
||
},
|
||
"required": ["ratio", "metaphorScore", "curveDesc"]
|
||
},
|
||
"module3": {
|
||
"type": "object",
|
||
"properties": {
|
||
"breakpoints": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"timestamp": {"type": "string"},
|
||
"contextText": {"type": "array", "items": {"type": "string"}},
|
||
"diagnosis": {"type": "string"}
|
||
},
|
||
"required": ["timestamp", "contextText", "diagnosis"]
|
||
}
|
||
},
|
||
"benchmark": {"type": "string"}
|
||
},
|
||
"required": ["breakpoints", "benchmark"]
|
||
},
|
||
"module4": {
|
||
"type": "object",
|
||
"properties": {
|
||
"radar": {
|
||
"type": "object",
|
||
"properties": {
|
||
"d": {"type": "number"},
|
||
"d_comment": {"type": "string"},
|
||
"e1": {"type": "number"},
|
||
"e1_comment": {"type": "string"},
|
||
"e2": {"type": "number"},
|
||
"e2_comment": {"type": "string"},
|
||
"p": {"type": "number"},
|
||
"p_comment": {"type": "string"}
|
||
},
|
||
"required": ["d", "d_comment", "e1", "e1_comment", "e2", "e2_comment", "p", "p_comment"]
|
||
},
|
||
"expertAdvice": {"type": "string"}
|
||
},
|
||
"required": ["radar", "expertAdvice"]
|
||
},
|
||
"module5": {
|
||
"type": "object",
|
||
"properties": {
|
||
"track1": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"node": {"type": "string"},
|
||
"action": {"type": "string"},
|
||
"strategy": {"type": "string"},
|
||
"purpose": {"type": "string"}
|
||
},
|
||
"required": ["node", "action", "strategy", "purpose"]
|
||
}
|
||
},
|
||
"track2": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"change": {"type": "string"},
|
||
"action": {"type": "string"},
|
||
"strategy": {"type": "string"}
|
||
},
|
||
"required": ["change", "action", "strategy"]
|
||
}
|
||
}
|
||
},
|
||
"required": ["track1", "track2"]
|
||
}
|
||
},
|
||
"required": ["module1", "module2", "module3", "module4", "module5"]
|
||
}
|
||
},
|
||
"required": ["patientName", "coreDemand", "acceptance", "duration", "xray"]
|
||
}
|
||
|
||
try:
|
||
from openai import OpenAI
|
||
litellmClient = OpenAI(
|
||
base_url=os.getenv("GEMINI_BASE_URL", "http://127.0.0.1:4000/v1"),
|
||
api_key=os.getenv("GEMINI_API_KEY", "sk-placeholder"),
|
||
)
|
||
|
||
stage2Response = await loop.run_in_executor(
|
||
None,
|
||
lambda: litellmClient.chat.completions.create(
|
||
model=os.getenv("DEEPVIEW_STAGE2_MODEL", "qwen-plus"),
|
||
messages=[
|
||
{"role": "system", "content": """你是一个 JSON 格式转换器。将用户提供的面诊分析报告精确转换为以下 JSON 结构。
|
||
忠实保留原文内容,不要增删改。必须严格使用以下字段名(不要用中文或其他名称)。
|
||
|
||
【类型约束·极其重要】
|
||
- duration: 纯整数(秒数),如 1800,不要带单位
|
||
- trustIndex: 纯整数 0-100,如 65,不要带%
|
||
- radar 里的 d/e1/e2/p: 纯浮点数 1.0-5.0,如 4.5,绝不能写成"4.5/5 分"或"4.5分"
|
||
|
||
顶层: reportCode, patientName, doctorName, coreDemand, acceptance, duration, xray
|
||
xray.module1: result, trustIndex, coreInsight
|
||
xray.module2: ratio, metaphorScore, curveDesc(字符串数组)
|
||
xray.module3: breakpoints(数组,每项含timestamp/contextText数组/diagnosis), benchmark
|
||
xray.module4: radar(含d/d_comment/e1/e1_comment/e2/e2_comment/p/p_comment), expertAdvice
|
||
xray.module5: track1(数组,每项含node/action/strategy/purpose), track2(数组,每项含change/action/strategy)"""},
|
||
{"role": "user", "content": mdReport}
|
||
],
|
||
response_format={"type": "json_object"},
|
||
max_tokens=8192,
|
||
user=userId,
|
||
extra_body={
|
||
"metadata": {
|
||
"deepview_task": contextId,
|
||
"deepview_stage": "stage2_json",
|
||
"deepview_user": userId,
|
||
}
|
||
},
|
||
)
|
||
)
|
||
|
||
jsonOutput = stage2Response.choices[0].message.content.strip()
|
||
parsedData = json.loads(jsonOutput)
|
||
|
||
logger.info(f"[DeepviewSSE] Stage 2 (json_object) done, keys={list(parsedData.keys())}")
|
||
xray = parsedData.get('xray', {})
|
||
for mk in ['module1', 'module2', 'module3', 'module4', 'module5']:
|
||
logger.info(f"[DeepviewSSE] xray.{mk}: {'✅' if xray.get(mk) else '❌ MISSING'}")
|
||
|
||
# ── 持久化:写入 DB(hermes state.db)──
|
||
import uuid
|
||
reportId = presetReportId if presetReportId else "rep_" + uuid.uuid4().hex[:8]
|
||
|
||
# Programmatically inject the real report identifier instead of hallucinated one
|
||
parsedData['reportCode'] = f"DW-AMXG-{reportId[4:].upper()}"
|
||
parsedData['id'] = reportId
|
||
parsedData['context_id'] = contextId
|
||
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
reportDb = SessionDB()
|
||
reportJson = json.dumps(parsedData, ensure_ascii=False)
|
||
with reportDb._lock:
|
||
if presetReportId:
|
||
reportDb._conn.execute(
|
||
"UPDATE deepview_reports_v2 SET status='completed', report_json=?, created_at=? WHERE report_id=?",
|
||
(reportJson, time.time(), presetReportId)
|
||
)
|
||
else:
|
||
reportDb._conn.execute(
|
||
"INSERT OR REPLACE INTO deepview_reports_v2 (report_id, context_id, user_id, report_json, created_at, status) VALUES (?, ?, ?, ?, ?, 'completed')",
|
||
(reportId, contextId, userId, reportJson, time.time())
|
||
)
|
||
reportDb._conn.commit()
|
||
logger.info(f"[DeepviewSSE] Report persisted to DB: {contextId} -> {reportId}")
|
||
except Exception as dbErr:
|
||
logger.error(f"[DeepviewSSE] DB persist failed: {dbErr}")
|
||
|
||
safeBody = json.dumps({"success": True, "reportId": reportId, "data": parsedData}, ensure_ascii=False)
|
||
return web.Response(text=safeBody, content_type="application/json", headers=_CORS_HEADERS)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewSSE] Stage 2 error: {e}")
|
||
return web.json_response({"error": f"Stage 2 (json conversion) failed: {e}"}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handleReportGet(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
GET /deepview/report/get?reportId=rep_xxx
|
||
从 DB 读取已持久化的报告 JSON。
|
||
"""
|
||
if request.method == "OPTIONS":
|
||
return web.Response(status=200, headers=_CORS_HEADERS)
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
reportId = request.query.get("reportId", "")
|
||
if not reportId:
|
||
return web.json_response({"error": "Missing reportId"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
cursor = db._conn.execute(
|
||
"SELECT report_json, created_at, client_id, context_id FROM deepview_reports_v2 WHERE report_id = ?",
|
||
(reportId,)
|
||
)
|
||
row = cursor.fetchone()
|
||
|
||
if not row:
|
||
return web.json_response({"error": "Report not found", "reportId": reportId}, status=404, headers=_CORS_HEADERS)
|
||
|
||
reportData = json.loads(row[0])
|
||
clientId = row[2]
|
||
contextId = row[3]
|
||
|
||
# Hot patch reportCode for older generated reports that might have hallucinated ones
|
||
reportData["reportCode"] = f"DW-AMXG-{reportId[4:].upper()}"
|
||
reportData["id"] = reportId
|
||
reportData["context_id"] = contextId
|
||
|
||
# Attach clientId into the data payload so the frontend knows if it's archived
|
||
if clientId:
|
||
reportData["clientId"] = clientId
|
||
|
||
safeBody = json.dumps({"success": True, "data": reportData}, ensure_ascii=False)
|
||
return web.Response(text=safeBody, content_type="application/json", headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewSSE] Report get error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handleReportsList(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
GET /deepview/reports/list
|
||
返回当前用户所有的面诊报告,按时间倒序排列。包含生成中的占位记录。
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
cursor = db._conn.execute(
|
||
"SELECT report_id, context_id, report_json, created_at, status, client_id FROM deepview_reports_v2 WHERE user_id = ? ORDER BY created_at DESC",
|
||
(user["userId"],)
|
||
)
|
||
rows = cursor.fetchall()
|
||
|
||
reports = []
|
||
for row in rows:
|
||
rep_id, ctx_id, r_json, c_time, status, client_id = row
|
||
try:
|
||
data = json.loads(r_json)
|
||
except:
|
||
data = {}
|
||
reports.append({
|
||
"id": rep_id,
|
||
"contextId": ctx_id,
|
||
"status": status,
|
||
"createdAt": c_time,
|
||
"clientId": client_id,
|
||
"data": data
|
||
})
|
||
|
||
return web.json_response({"success": True, "reports": reports}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewSSE] Reports list error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handleMaterialsUploadToken(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/materials/upload_token
|
||
Get a pre-signed URL for direct OSS upload.
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except:
|
||
body = {}
|
||
|
||
filename = body.get("filename", "unknown.pdf")
|
||
import os
|
||
ext = os.path.splitext(filename)[1].lower()
|
||
import uuid
|
||
file_uid = uuid.uuid4().hex
|
||
object_key = f"deepview-raw/{user['userId']}/{file_uid}{ext}"
|
||
|
||
oss_ak = os.getenv("ALIYUN_ACCESS_KEY_ID", "")
|
||
oss_sk = os.getenv("ALIYUN_ACCESS_KEY_SECRET", "")
|
||
oss_bucket = os.getenv("OSS_BUCKET", "meetings-dev")
|
||
oss_endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com")
|
||
|
||
if not all([oss_ak, oss_sk]):
|
||
# 降级:如果服务器没配 OSS 参数,或者本地测试,可能直接失败
|
||
return web.json_response({"error": "Server missing OSS credentials (ALIYUN_ACCESS_KEY_ID)"}, status=500, headers=_CORS_HEADERS)
|
||
|
||
import oss2
|
||
auth = oss2.Auth(oss_ak, oss_sk)
|
||
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
|
||
|
||
# 必须显式指定 Content-Type 来生成签名,否则前端带默认 MIME type 会导致 OSS 拒签 403
|
||
headers = {'Content-Type': 'application/octet-stream'}
|
||
put_url = bucket.sign_url("PUT", object_key, 3600, headers=headers)
|
||
if put_url.startswith("http://"):
|
||
put_url = put_url.replace("http://", "https://", 1)
|
||
|
||
return web.json_response({
|
||
"putUrl": put_url,
|
||
"ossKey": object_key,
|
||
"filename": filename
|
||
}, headers=_CORS_HEADERS)
|
||
|
||
async def _handleMaterialsConfirm(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/materials/confirm
|
||
Frontend calls this after OSS PUT succeeds.
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except:
|
||
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
oss_key = body.get("ossKey")
|
||
context_id = body.get("contextId", "deepview")
|
||
filename = body.get("filename", "unknown.pdf")
|
||
|
||
if not oss_key:
|
||
return web.json_response({"error": "Missing ossKey"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
# Offload to background task
|
||
try:
|
||
from .deepview_materials import process_uploaded_material_oss
|
||
except ImportError:
|
||
from gateway.platforms.deepview_materials import process_uploaded_material_oss
|
||
|
||
import uuid
|
||
import time
|
||
report_id = "rep_" + uuid.uuid4().hex[:8]
|
||
|
||
# INSERT placeholder INTO deepview_reports_v2
|
||
if context_id.startswith("recording:"):
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute(
|
||
"INSERT INTO deepview_reports_v2 (report_id, context_id, user_id, report_json, created_at, status) VALUES (?, ?, ?, '{}', ?, 'processing')",
|
||
(report_id, context_id, user["userId"], time.time())
|
||
)
|
||
db._conn.commit()
|
||
except Exception as e:
|
||
logger.error(f"Failed to create processing report placeholder: {e}")
|
||
|
||
async def _wrapper():
|
||
# 1. ASR stage
|
||
await process_uploaded_material_oss(
|
||
oss_key=oss_key,
|
||
original_filename=filename,
|
||
context_id=context_id,
|
||
push_event_fn=self._pushEvent,
|
||
user_id=user["userId"],
|
||
org_id=user.get("org", "org_001")
|
||
)
|
||
# 2. X-ray stage (Auto orchestrate)
|
||
if context_id.startswith("recording:"):
|
||
import sys, os
|
||
try:
|
||
port = int(os.environ.get('PORT', 8645))
|
||
import aiohttp
|
||
async with aiohttp.ClientSession() as session:
|
||
url = f"http://127.0.0.1:{port}/deepview/report/generate"
|
||
headers = {"Authorization": request.headers.get("Authorization", "")}
|
||
logger.info(f"[DeepviewSSE] Auto-triggering report generation internally for {context_id} -> {report_id}")
|
||
async with session.post(url, json={"contextId": context_id, "doctorId": "doc_001", "presetReportId": report_id}, headers=headers) as resp:
|
||
res_json = await resp.json()
|
||
if res_json.get("success"):
|
||
self._pushEvent(user["userId"], "report:ready", {"reportId": report_id})
|
||
else:
|
||
logger.error(f"[DeepviewSSE] Internal generate failed: {res_json}")
|
||
# Mark as failed in DB
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute("UPDATE deepview_reports_v2 SET status='failed' WHERE report_id=?", (report_id,))
|
||
db._conn.commit()
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewSSE] Wrapper pipeline failed: {e}")
|
||
|
||
asyncio.create_task(_wrapper())
|
||
|
||
return web.json_response({"received": True, "reportId": report_id if context_id.startswith("recording:") else None}, headers=_CORS_HEADERS)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 会话列表 API
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def _handleSessionsList(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
GET /deepview/sessions/list
|
||
|
||
从 SessionDB 查询当前用户的所有 deepview 会话。
|
||
返回 {sessions: [{id, title, startedAt, preview, messageCount}]}
|
||
|
||
★ 自动迁移:首次真实登录时,将 dev_user_001 和 NULL 的旧记录归属到当前用户
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
userId = user["userId"]
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
|
||
# ★ 自动迁移:如果当前是真实 JWT 用户(非 dev),
|
||
# 将所有 deepview source 且 user_id 为 NULL 或 dev_user_001 的 session 归属给当前用户
|
||
if userId != "dev_user_001":
|
||
try:
|
||
conn = db._conn
|
||
with db._lock:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
cursor = conn.execute(
|
||
"UPDATE sessions SET user_id = ? "
|
||
"WHERE source = 'deepview' AND (user_id IS NULL OR user_id = 'dev_user_001')",
|
||
(userId,)
|
||
)
|
||
migrated = cursor.rowcount
|
||
conn.commit()
|
||
if migrated > 0:
|
||
logger.info("[DeepviewSSE] Auto-migrated %d orphan sessions to userId=%s",
|
||
migrated, userId[:8])
|
||
except Exception as e:
|
||
logger.warning("[DeepviewSSE] Session migration failed: %s", e)
|
||
try:
|
||
conn.rollback()
|
||
except Exception:
|
||
pass
|
||
|
||
# 查询 source="deepview" 的所有会话
|
||
rawSessions = db.list_sessions_rich(source="deepview", limit=100)
|
||
|
||
# 按 user_id 过滤(只返回当前用户的会话)
|
||
sessions = []
|
||
for s in rawSessions:
|
||
# 开发模式或 user_id 匹配
|
||
if s.get("user_id") == userId or os.environ.get("DEEPVIEW_AUTH_DEV_MODE", ""):
|
||
sessions.append({
|
||
"id": s["id"],
|
||
"title": s.get("title") or s.get("preview") or "新对话",
|
||
"startedAt": s.get("started_at"),
|
||
"messageCount": s.get("message_count", 0),
|
||
"preview": s.get("preview", ""),
|
||
})
|
||
|
||
return web.json_response({"sessions": sessions}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to list sessions for userId=%s: %s", userId, e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 报告持久化
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _initReportsDb(self) -> None:
|
||
"""确保 deepview_reports 表存在(复用 Hermes state.db)。"""
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS deepview_reports_v2 (
|
||
report_id TEXT PRIMARY KEY,
|
||
context_id TEXT NOT NULL,
|
||
user_id TEXT NOT NULL,
|
||
report_json TEXT NOT NULL,
|
||
created_at REAL NOT NULL,
|
||
status TEXT DEFAULT 'completed',
|
||
client_id TEXT DEFAULT NULL
|
||
)
|
||
""")
|
||
cursor = db._conn.execute("PRAGMA table_info(deepview_reports_v2)")
|
||
columns = [row[1] for row in cursor.fetchall()]
|
||
if 'status' not in columns:
|
||
db._conn.execute("ALTER TABLE deepview_reports_v2 ADD COLUMN status TEXT DEFAULT 'completed'")
|
||
if 'client_id' not in columns:
|
||
db._conn.execute("ALTER TABLE deepview_reports_v2 ADD COLUMN client_id TEXT DEFAULT NULL")
|
||
|
||
db._conn.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_deepview_reports_v2_user
|
||
ON deepview_reports_v2(user_id)
|
||
""")
|
||
db._conn.commit()
|
||
logger.info("[DeepviewSSE] Reports table ready")
|
||
except Exception as e:
|
||
logger.warning("[DeepviewSSE] Failed to init reports table: %s", e)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 钉选板持久化
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _initPinsDb(self) -> None:
|
||
"""确保 deepview_pinned_items 表存在(复用 Hermes state.db)。"""
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS deepview_pinned_items (
|
||
id TEXT PRIMARY KEY,
|
||
user_id TEXT NOT NULL,
|
||
title TEXT,
|
||
snippet TEXT,
|
||
context_id TEXT DEFAULT 'deepview',
|
||
chat_id TEXT,
|
||
created_at REAL NOT NULL
|
||
)
|
||
""")
|
||
db._conn.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_deepview_pins_user
|
||
ON deepview_pinned_items(user_id, context_id)
|
||
""")
|
||
db._conn.commit()
|
||
logger.info("[DeepviewSSE] Pinned items table ready")
|
||
except Exception as e:
|
||
logger.warning("[DeepviewSSE] Failed to init pins table: %s", e)
|
||
|
||
async def _handlePinsList(self, request: "web.Request") -> "web.Response":
|
||
"""GET /deepview/pins/list — 获取当前用户的所有钉选项。"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
userId = user["userId"]
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
cursor = db._conn.execute(
|
||
"SELECT id, title, snippet, context_id, chat_id, created_at "
|
||
"FROM deepview_pinned_items WHERE user_id = ? ORDER BY created_at DESC",
|
||
(userId,)
|
||
)
|
||
rows = cursor.fetchall()
|
||
|
||
pins = []
|
||
for r in rows:
|
||
row = dict(r)
|
||
# 格式化日期为 MM-DD
|
||
import datetime
|
||
dt = datetime.datetime.fromtimestamp(row["created_at"])
|
||
dateStr = f"{dt.month:02d}-{dt.day:02d}"
|
||
pins.append({
|
||
"id": row["id"],
|
||
"title": row["title"] or "提取结论",
|
||
"snippet": row["snippet"] or "",
|
||
"date": dateStr,
|
||
"contextId": row["context_id"],
|
||
"chatId": row["chat_id"] or "",
|
||
})
|
||
|
||
return web.json_response({"pins": pins}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to list pins: %s", e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handlePinsAdd(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/pins/add
|
||
Body: { id, title, snippet, contextId, chatId }
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except Exception:
|
||
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
userId = user["userId"]
|
||
pinId = body.get("id", f"pin_{int(time.time())}")
|
||
title = body.get("title", "提取结论")
|
||
snippet = body.get("snippet", "")
|
||
contextId = body.get("contextId", "deepview")
|
||
chatId = body.get("chatId", "")
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
|
||
# 去重:同 user + context + title + snippet 不重复插入
|
||
with db._lock:
|
||
existing = db._conn.execute(
|
||
"SELECT id FROM deepview_pinned_items "
|
||
"WHERE user_id = ? AND context_id = ? AND title = ? AND snippet = ?",
|
||
(userId, contextId, title, snippet)
|
||
).fetchone()
|
||
|
||
if existing:
|
||
return web.json_response({"duplicate": True, "id": existing["id"]}, headers=_CORS_HEADERS)
|
||
|
||
def _do(conn):
|
||
conn.execute(
|
||
"INSERT INTO deepview_pinned_items (id, user_id, title, snippet, context_id, chat_id, created_at) "
|
||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||
(pinId, userId, title, snippet, contextId, chatId, time.time())
|
||
)
|
||
db._execute_write(_do)
|
||
|
||
return web.json_response({"success": True, "id": pinId}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to add pin: %s", e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
async def _handlePinsRemove(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/pins/remove
|
||
Body: { id }
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except Exception:
|
||
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
userId = user["userId"]
|
||
pinId = body.get("id", "")
|
||
if not pinId:
|
||
return web.json_response({"error": "Missing id"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
|
||
def _do(conn):
|
||
conn.execute(
|
||
"DELETE FROM deepview_pinned_items WHERE id = ? AND user_id = ?",
|
||
(pinId, userId)
|
||
)
|
||
db._execute_write(_do)
|
||
|
||
return web.json_response({"success": True}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to remove pin: %s", e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 素材文件持久化
|
||
# ──────────────────────────────────────────────
|
||
|
||
# ★ 驨总基础知识库文件(预填充到 DB,统一管理)
|
||
_DEEPVIEW_BASE_FILES = [
|
||
("xz_0", "00_前言.md", "deepview"),
|
||
("xz_1", "01_第一章 品牌机会论证.md", "deepview"),
|
||
("xz_2", "02_第二章 原点市场与产品概念.md", "deepview"),
|
||
("xz_3", "03_第三章 品牌九要素之语言表达.md", "deepview"),
|
||
("xz_4", "04_第四章 品牌九要素之视觉表达.md", "deepview"),
|
||
("xz_5", "05_第五章 产品设计、广告与公关建品牌.md", "deepview"),
|
||
("xz_6", "06_第六章 新产品上市之品牌试错期.md", "deepview"),
|
||
("xz_7", "07_第七章 品牌的增长.md", "deepview"),
|
||
("xz_8", "08_第八章 连锁合作谈判.md", "deepview"),
|
||
("xz_9", "09_第九章 打赢商战.md", "deepview"),
|
||
]
|
||
|
||
def _initMaterialsDb(self) -> None:
|
||
"""确保 deepview_materials 表存在并预填基础文件。"""
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS deepview_materials (
|
||
id TEXT PRIMARY KEY,
|
||
filename TEXT NOT NULL,
|
||
context_id TEXT DEFAULT 'deepview',
|
||
user_id TEXT,
|
||
source TEXT DEFAULT 'base',
|
||
created_at REAL NOT NULL
|
||
)
|
||
""")
|
||
db._conn.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_deepview_materials_ctx
|
||
ON deepview_materials(context_id)
|
||
""")
|
||
# 预填基础知识库文件
|
||
for fid, fname, ctx in self._DEEPVIEW_BASE_FILES:
|
||
db._conn.execute(
|
||
"INSERT OR IGNORE INTO deepview_materials (id, filename, context_id, source, created_at) "
|
||
"VALUES (?, ?, ?, 'base', ?)",
|
||
(fid, fname, ctx, 0) # created_at=0 表示基础文件
|
||
)
|
||
db._conn.commit()
|
||
logger.info("[DeepviewSSE] Materials table ready")
|
||
except Exception as e:
|
||
logger.warning("[DeepviewSSE] Failed to init materials table: %s", e)
|
||
|
||
async def _handleMaterialsList(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
GET /deepview/materials/list?contextId=deepview
|
||
|
||
返回指定 context 下的所有素材文件(基础 + 用户上传)。
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
contextId = request.query.get("contextId", "deepview")
|
||
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
cursor = db._conn.execute(
|
||
"SELECT id, filename, context_id, source "
|
||
"FROM deepview_materials WHERE context_id = ? ORDER BY created_at ASC",
|
||
(contextId,)
|
||
)
|
||
rows = cursor.fetchall()
|
||
|
||
files = []
|
||
for r in rows:
|
||
row = dict(r)
|
||
files.append({
|
||
"id": row["id"],
|
||
"name": row["filename"],
|
||
"projectId": row["context_id"],
|
||
"source": row["source"],
|
||
})
|
||
|
||
return web.json_response({"files": files}, headers=_CORS_HEADERS)
|
||
except Exception as e:
|
||
logger.error("Failed to list materials: %s", e)
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 客户信息 API (Phase 4)
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def _handleClientsList(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
GET /deepview/clients/list
|
||
返回所有客户记录列表。扫描 storage/clients 目录。
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
|
||
clientsDir = os.path.join(userDir, "clients")
|
||
|
||
clients = []
|
||
if os.path.exists(clientsDir):
|
||
for d in os.listdir(clientsDir):
|
||
clientPath = os.path.join(clientsDir, d)
|
||
if os.path.isdir(clientPath):
|
||
# 尝试读取 profile.md 以获取首行做名字(可选优化)
|
||
# 默认使用目录名作为 ID 和名字
|
||
clients.append({
|
||
"id": d,
|
||
"name": d,
|
||
"hasProfile": os.path.exists(os.path.join(clientPath, "profile.md"))
|
||
})
|
||
|
||
return web.json_response({"clients": clients}, headers=_CORS_HEADERS)
|
||
async def _handleClientCreate(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/clients/create
|
||
动态建档
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except:
|
||
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
name = body.get("name", "").strip()
|
||
phone = body.get("phone", "").strip()
|
||
|
||
if not name:
|
||
return web.json_response({"error": "Name is required"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
import uuid
|
||
import time
|
||
clientId = f"p_{str(uuid.uuid4())[:8]}"
|
||
|
||
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
|
||
clientDir = os.path.join(userDir, "clients", clientId)
|
||
os.makedirs(clientDir, exist_ok=True)
|
||
|
||
profileData = f"# 客户基本档案\n姓名:{name}\n手机号/尾号:{phone}\n建档时间:{time.strftime('%Y-%m-%d')}\n"
|
||
with open(os.path.join(clientDir, "profile.md"), "w", encoding="utf-8") as f:
|
||
f.write(profileData)
|
||
|
||
return web.json_response({
|
||
"id": clientId,
|
||
"name": name,
|
||
"phone": phone
|
||
}, headers=_CORS_HEADERS)
|
||
|
||
async def _handleClientProfile(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
GET /deepview/clients/{id}/profile
|
||
读取并返回某个客户的 profile.md。
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
clientId = request.match_info.get("id", "")
|
||
if not clientId:
|
||
return web.json_response({"error": "Missing client ID"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
|
||
profilePath = os.path.join(userDir, "clients", clientId, "profile.md")
|
||
|
||
content = ""
|
||
if os.path.exists(profilePath):
|
||
with open(profilePath, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
return web.json_response({
|
||
"id": clientId,
|
||
"profileContent": content
|
||
}, headers=_CORS_HEADERS)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 报告归档 (Inbox → Client)
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def _handleReportArchive(self, request: "web.Request") -> "web.Response":
|
||
"""
|
||
POST /deepview/reports/archive
|
||
Body: { reportId, clientId }
|
||
将 inbox 中的报告物理迁移到 clients/{clientId}/history/ 并更新 DB。
|
||
"""
|
||
user = await self._extractUser(request)
|
||
if not user:
|
||
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
|
||
|
||
try:
|
||
body = await request.json()
|
||
except:
|
||
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
reportId = body.get("reportId", "").strip()
|
||
clientId = body.get("clientId", "").strip()
|
||
|
||
if not reportId or not clientId:
|
||
return web.json_response({"error": "Missing reportId or clientId"}, status=400, headers=_CORS_HEADERS)
|
||
|
||
userId = user["userId"]
|
||
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
|
||
|
||
# 1. 物理文件迁移:users/{userId}/inbox/{reportId}/ → users/{userId}/clients/{clientId}/history/{reportId}/
|
||
import shutil
|
||
inboxDir = os.path.join(userDir, "inbox", reportId)
|
||
targetDir = os.path.join(userDir, "clients", clientId, "history", reportId)
|
||
|
||
if os.path.exists(inboxDir):
|
||
os.makedirs(os.path.dirname(targetDir), exist_ok=True)
|
||
shutil.move(inboxDir, targetDir)
|
||
logger.info(f"[DeepviewSSE] Archived {inboxDir} → {targetDir}")
|
||
|
||
# 2. DB 更新:标记 client_id + 更新 context_id
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
with db._lock:
|
||
db._conn.execute(
|
||
"UPDATE deepview_reports_v2 SET client_id=?, context_id=? WHERE report_id=? AND user_id=?",
|
||
(clientId, f"recording:{clientId}/{reportId}", reportId, userId)
|
||
)
|
||
db._conn.commit()
|
||
logger.info(f"[DeepviewSSE] DB archived: {reportId} → client {clientId}")
|
||
except Exception as e:
|
||
logger.error(f"[DeepviewSSE] Archive DB update failed: {e}")
|
||
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
|
||
|
||
return web.json_response({"success": True, "reportId": reportId, "clientId": clientId}, headers=_CORS_HEADERS)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 服务器生命周期
|
||
# ──────────────────────────────────────────────
|
||
|
||
async def start(self) -> None:
|
||
"""启动 aiohttp 服务器。"""
|
||
if not AIOHTTP_AVAILABLE:
|
||
raise RuntimeError("aiohttp is required for DeepviewSSEServer")
|
||
|
||
self._app = web.Application()
|
||
|
||
# 注册路由
|
||
self._app.router.add_get("/deepview/health", self._handleHealth)
|
||
self._app.router.add_get("/deepview/chat/history", self._handleHistory)
|
||
self._app.router.add_get("/deepview/sessions/list", self._handleSessionsList)
|
||
self._app.router.add_get("/deepview/events", self._handleSSEConnect)
|
||
self._app.router.add_post("/deepview/chat", self._handleChat)
|
||
self._app.router.add_post("/deepview/report/generate", self._handleReportGenerate)
|
||
self._app.router.add_get("/deepview/report/get", self._handleReportGet)
|
||
self._app.router.add_get("/deepview/reports/list", self._handleReportsList)
|
||
self._app.router.add_get("/deepview/profile", self._handleProfileGet)
|
||
self._app.router.add_post("/deepview/profile", self._handleProfilePost)
|
||
self._app.router.add_post("/deepview/materials/upload_token", self._handleMaterialsUploadToken)
|
||
self._app.router.add_post("/deepview/materials/confirm", self._handleMaterialsConfirm)
|
||
self._app.router.add_get("/deepview/materials/list", self._handleMaterialsList)
|
||
self._app.router.add_get("/deepview/pins/list", self._handlePinsList)
|
||
self._app.router.add_post("/deepview/pins/add", self._handlePinsAdd)
|
||
self._app.router.add_post("/deepview/pins/remove", self._handlePinsRemove)
|
||
|
||
# 客户信息 APIs
|
||
self._app.router.add_get("/deepview/clients/list", self._handleClientsList)
|
||
self._app.router.add_post("/deepview/clients/create", self._handleClientCreate)
|
||
self._app.router.add_get("/deepview/clients/{id}/profile", self._handleClientProfile)
|
||
|
||
# 报告归档 API (Inbox → Client)
|
||
self._app.router.add_post("/deepview/reports/archive", self._handleReportArchive)
|
||
|
||
# OPTIONS 预检(CORS)
|
||
self._app.router.add_route("OPTIONS", "/deepview/{path:.*}", self._handleOptions)
|
||
|
||
self._runner = web.AppRunner(self._app)
|
||
await self._runner.setup()
|
||
site = web.TCPSite(self._runner, self._host, self._port)
|
||
await site.start()
|
||
|
||
devMode = os.environ.get("DEEPVIEW_AUTH_DEV_MODE", "")
|
||
authMode = "dev mode (no auth)" if devMode else f"MindPass delegation ({MINDPASS_BASE_URL})"
|
||
logger.info("[DeepviewSSE] Server started on http://%s:%d (auth: %s)",
|
||
self._host, self._port, authMode)
|
||
|
||
async def stop(self) -> None:
|
||
"""停止服务器。"""
|
||
# 断开所有 SSE 客户端
|
||
for userId, conns in list(self._sseClients.items()):
|
||
for connId, q in conns.items():
|
||
q.put_nowait(None)
|
||
self._sseClients.clear()
|
||
|
||
if self._runner:
|
||
await self._runner.cleanup()
|
||
logger.info("[DeepviewSSE] Server stopped")
|
||
|
||
async def _handleOptions(self, request: "web.Request") -> "web.Response":
|
||
"""CORS 预检。"""
|
||
return web.Response(status=200, headers=_CORS_HEADERS)
|
||
|
||
|
||
# ── 独立启动入口 ──
|
||
async def main():
|
||
"""直接运行此文件来启动深维 SSE 服务器。"""
|
||
import sys
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
|
||
|
||
server = DeepviewSSEServer(
|
||
host=os.getenv("DEEPVIEW_SSE_HOST", "127.0.0.1"),
|
||
port=int(os.getenv("DEEPVIEW_SSE_PORT", "8643")),
|
||
)
|
||
await server.start()
|
||
|
||
# 保持运行
|
||
try:
|
||
while True:
|
||
await asyncio.sleep(3600)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
finally:
|
||
await server.stop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|