doctorAI/backend/gateway/platforms/deepview_sse.py

1757 lines
79 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
深维面诊智能体 SSE 桥接适配器 (deepview_sse.py)
独立的 aiohttp HTTP 服务器,专为深维面诊智能体前端提供:
- GET /deepview/events — 全局 SSE 事件流JWT 认证)
- POST /deepview/chat — 发起对话JWT 认证)
- GET /deepview/health — 健康检查
认证:复用 MindPass 统一认证体系JWT sub 字段作为 userId。
隔离:三层模型 — userId (身份) × connId (标签页) × chatId (对话)。
翻译规则(基于 Hermes 引擎源码审计):
- stream_delta_callback(text) → SSE agent:chunk
- stream_delta_callback(None) → 【丢弃】
- tool_progress("tool.started") → SSE agent:thinking
- tool_progress("tool.completed") → 【丢弃】
- tool_progress("reasoning.available") → 【丢弃】
- run_conversation() returns → SSE agent:done
- run_conversation() raises → SSE agent:error
"""
import asyncio
import json
import logging
import os
import time
import uuid
from typing import Any, Dict, Optional
try:
from aiohttp import web, ClientSession, ClientTimeout
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
web = None # type: ignore[assignment]
ClientSession = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# ── MindPass 验证委托(路径 B不持有密钥调用 MindPass /api/auth/me 验证 Token ──
MINDPASS_BASE_URL = os.environ.get("MINDPASS_BASE_URL", "http://127.0.0.1:3020")
# 内存缓存:避免同一 token 高频重复请求 MindPassTTL 60s
_tokenCache: Dict[str, tuple] = {} # token -> (userDict, expireTimestamp)
_TOKEN_CACHE_TTL = 60 # 秒
async def _verifyTokenAsync(token: str) -> Optional[dict]:
"""
★ 路径 B委托 MindPass 验证 Token。
本服务不持有 JWT 签发密钥,通过 HTTP 调用 MindPass 中央认证服务。
返回 {userId, phone, name, avatarUrl} 或 None。
"""
if not token:
return None
# 开发模式:未配置 MINDPASS_BASE_URL 且不是默认值时,允许 userId 直传
devMode = os.environ.get("DEEPVIEW_AUTH_DEV_MODE", "")
if devMode:
logger.warning("[DeepviewSSE] Auth dev mode enabled, using token as userId")
return {"userId": token, "phone": "", "name": "开发用户"}
# 检查内存缓存
now = time.time()
cached = _tokenCache.get(token)
if cached and cached[1] > now:
return cached[0]
# 调用 MindPass /api/auth/me
try:
timeout = ClientTimeout(total=5)
async with ClientSession(timeout=timeout) as session:
async with session.get(
f"{MINDPASS_BASE_URL}/api/auth/me",
headers={"Authorization": f"Bearer {token}"},
) as resp:
if resp.status != 200:
logger.info("[DeepviewSSE] MindPass returned %d for token verification", resp.status)
return None
data = await resp.json()
user = {
"userId": data["id"],
"phone": data.get("phone", ""),
"name": data.get("name", ""),
"avatarUrl": data.get("avatarUrl", ""),
}
# 缓存结果
_tokenCache[token] = (user, now + _TOKEN_CACHE_TTL)
return user
except Exception as e:
logger.error("[DeepviewSSE] MindPass verification failed: %s", e)
return None
def _verifyToken(token: str) -> Optional[dict]:
"""同步兼容包装:在已有 event loop 的上下文中使用 _verifyTokenAsync。"""
# 被 _extractUser 中的同步路径调用时的兜底
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# 在 aiohttp handler 中,应使用 _verifyTokenAsync
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, _verifyTokenAsync(token))
return future.result(timeout=6)
else:
return loop.run_until_complete(_verifyTokenAsync(token))
except Exception as e:
logger.error("[DeepviewSSE] Sync token verify failed: %s", e)
return None
# ── 工具名 → 用户可读标签 ──
_STEP_LABELS = {
"read_file": "📖 正在查阅知识库",
"search_files": "🔍 正在搜索相关文件",
"web_search": "🌐 正在搜索网络",
"web_extract": "📄 正在提取网页内容",
"terminal": "⚙️ 正在执行命令",
"vision_analyze": "👁️ 正在分析图片",
"patch": "✏️ 正在修改文件",
"write_file": "💾 正在写入文件",
}
def _translate_step(toolName: str, preview: str) -> str:
"""将 Hermes 工具名翻译为前端展示文案。"""
return _STEP_LABELS.get(toolName, f"🔧 {preview or toolName}")
# ── CORS ──
_CORS_HEADERS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Max-Age": "600",
}
class DeepviewSSEServer:
"""
深维面诊智能体前端的专属 SSE 服务器。
职责:
1. 管理多顾问、多标签页的 SSE 长连接
2. 将 Hermes AIAgent 的底层回调翻译为封闭事件集中的 7 个 SSE 事件
3. 提供业务 API对话、文件上传、钉入结论
隔离模型:
- userId (身份层) ← MindPass JWT sub决定事件推给谁
- connId (连接层) ← 每个浏览器标签页唯一,同用户多标签页共存
- chatId (会话层) ← 每个对话 Tab 唯一,前端按 chatId 分桶渲染
"""
def __init__(self, host: str = "127.0.0.1", port: int = 8643):
self._host = host
self._port = port
self._app: Optional["web.Application"] = None
self._runner: Optional["web.AppRunner"] = None
# userId → { connId → asyncio.Queue }
# 同一用户的多个标签页各有自己的 Queue事件广播到所有连接
self._sseClients: Dict[str, Dict[str, "asyncio.Queue[Optional[str]]"]] = {}
# ★ 初始化钉选板存储
self._initPinsDb()
# ★ 初始化素材文件表
self._initMaterialsDb()
# ★ 初始化报告持久化表
self._initReportsDb()
# ★ 初始化扩展档案表
self._initProfilesDb()
# ──────────────────────────────────────────────
# 存储辅助方法
# ──────────────────────────────────────────────
def _getUserStorageDir(self, userId: str) -> str:
"""返回当前用户的专属存储沙箱根路径,不存在时自动创建。"""
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
userDir = os.path.join(storageDir, "users", userId)
os.makedirs(userDir, exist_ok=True)
return userDir
def _getOrgStorageDir(self, orgId: str) -> str:
"""返回机构的存储沙箱根路径,不存在时自动创建。"""
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
orgDir = os.path.join(storageDir, "orgs", orgId)
os.makedirs(orgDir, exist_ok=True)
return orgDir
# ──────────────────────────────────────────────
# ──────────────────────────────────────────────
def _pushEvent(self, userId: str, eventName: str, data: dict) -> None:
"""向指定用户的所有 SSE 连接(标签页)推送事件。"""
conns = self._sseClients.get(userId)
if not conns:
logger.debug("SSE push to disconnected userId=%s, event=%s dropped", userId, eventName)
return
payload = f"event: {eventName}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
for connId, q in list(conns.items()):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
logger.warning("SSE queue full: userId=%s connId=%s, dropping event=%s",
userId, connId, eventName)
def _broadcastEvent(self, eventName: str, data: dict) -> None:
"""向所有已连接的用户推送事件(用于全局通知如 wiki:updated"""
for userId in list(self._sseClients.keys()):
self._pushEvent(userId, eventName, data)
# ──────────────────────────────────────────────
# Hermes AIAgent 回调工厂
# ──────────────────────────────────────────────
def _makeStreamDeltaCallback(self, userId: str, chatId: str, loop: asyncio.AbstractEventLoop):
"""
创建 stream_delta_callback。
翻译规则:
- text (非 None) → agent:chunk
- None → 丢弃CLI 显示层哨兵)
"""
def callback(delta):
if delta is None:
return # 丢弃 CLI 哨兵信号
loop.call_soon_threadsafe(
self._pushEvent, userId, "agent:chunk", {
"chatId": chatId,
"text": delta,
}
)
return callback
def _makeToolProgressCallback(self, userId: str, chatId: str, loop: asyncio.AbstractEventLoop):
"""
创建 tool_progress_callback。
翻译规则:
- tool.started → agent:thinking
- tool.completed → 丢弃(前端不需要)
- reasoning.available → 丢弃(与 stream_delta 内容重复)
- _thinking → 丢弃(子代理中继,本项目不涉及)
"""
def callback(eventType, name, preview=None, args=None, **kwargs):
if eventType == "tool.started":
# 过滤内部事件
if name and name.startswith("_"):
return
message = _translate_step(name, preview)
loop.call_soon_threadsafe(
self._pushEvent, userId, "agent:thinking", {
"chatId": chatId,
"step": name,
"message": message,
}
)
# tool.completed, reasoning.available, _thinking → 全部丢弃
return callback
# ──────────────────────────────────────────────
# 认证辅助
# ──────────────────────────────────────────────
async def _extractUser(self, request: "web.Request") -> Optional[dict]:
"""
从请求中提取用户信息(异步版本,委托 MindPass 验证)。
优先级:
1. Authorization: Bearer <token> 头HTTP POST 请求用)
2. ?token=<token> 查询参数EventSource 连接用,因为 EventSource 不支持自定义 Header
"""
# 1. Header
authHeader = request.headers.get("Authorization", "")
if authHeader.startswith("Bearer "):
token = authHeader[7:].strip()
user = await _verifyTokenAsync(token)
if user:
return user
# 2. Query parameter
token = request.query.get("token", "").strip()
if token:
return await _verifyTokenAsync(token)
return None
# ──────────────────────────────────────────────
# HTTP Handlers
# ──────────────────────────────────────────────
def _initProfilesDb(self) -> None:
"""确保 deepview_user_profiles 表存在,用于存储额外的用户档案"""
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
db._conn.execute("""
CREATE TABLE IF NOT EXISTS deepview_user_profiles (
user_id TEXT PRIMARY KEY,
real_name TEXT,
company TEXT,
role TEXT,
voice_url TEXT,
updated_at REAL NOT NULL
)
""")
# Try to add real_name column if it's missing (for upgrade)
try:
db._conn.execute("ALTER TABLE deepview_user_profiles ADD COLUMN real_name TEXT")
except:
pass
db._conn.commit()
except Exception as e:
logger.warning("[DeepviewSSE] Failed to init profiles table: %s", e)
async def _handleProfileGet(self, request: "web.Request") -> "web.Response":
"""GET /deepview/profile"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
row = db._conn.execute(
"SELECT real_name, company, role, voice_url FROM deepview_user_profiles WHERE user_id = ?",
(user["userId"],)
).fetchone()
if row:
return web.json_response({
"realName": row[0],
"company": row[1],
"role": row[2],
"voiceUrl": row[3]
}, headers=_CORS_HEADERS)
else:
return web.json_response({
"realName": "",
"company": "",
"role": "",
"voiceUrl": ""
}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to get profile: %s", e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
async def _handleProfilePost(self, request: "web.Request") -> "web.Response":
"""POST /deepview/profile"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
data = await request.json()
real_name = data.get("realName", "")
company = data.get("company", "")
role = data.get("role", "")
voice_url = data.get("voiceUrl", "")
from hermes_state import SessionDB
import time
db = SessionDB()
with db._lock:
db._conn.execute("""
INSERT INTO deepview_user_profiles (user_id, real_name, company, role, voice_url, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
real_name=excluded.real_name,
company=excluded.company,
role=excluded.role,
voice_url=excluded.voice_url,
updated_at=excluded.updated_at
""", (user["userId"], real_name, company, role, voice_url, time.time()))
db._conn.commit()
return web.json_response({"success": True}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to set profile: %s", e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
async def _handleHistory(self, request: "web.Request") -> "web.Response":
"""GET /deepview/chat/history?chatId=xxx"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
chatId = request.query.get("chatId", "").strip()
if not chatId:
return web.json_response({"error": "chatId required"}, status=400, headers=_CORS_HEADERS)
try:
from hermes_state import SessionDB
db = SessionDB()
# SessionDB returns: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
messages = db.get_messages_as_conversation(chatId)
return web.json_response({"messages": messages}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to load history for chatId=%s: %s", chatId, e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
async def _handleHealth(self, request: "web.Request") -> "web.Response":
"""GET /deepview/health"""
clientCount = sum(len(conns) for conns in self._sseClients.values())
userCount = len(self._sseClients)
return web.json_response({
"status": "ok",
"service": "deepview-sse",
"users": userCount,
"connections": clientCount,
})
async def _handleSSEConnect(self, request: "web.Request") -> "web.StreamResponse":
"""
GET /deepview/events?token=xxx&connId=yyy
建立 SSE 长连接。前端 GlobalSSEService 调用此端点。
- token: MindPass JWT通过 query 参数传递,因为 EventSource 不支持自定义 Header
- connId: 前端自动生成的连接标识(区分同用户多标签页)
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401)
userId = user["userId"]
connId = request.query.get("connId", "").strip()
if not connId:
connId = f"conn_{int(time.time())}_{uuid.uuid4().hex[:6]}"
# 在 userId 下注册该 connId 的 Queue不影响其他标签页
if userId not in self._sseClients:
self._sseClients[userId] = {}
self._sseClients[userId][connId] = asyncio.Queue(maxsize=500)
q = self._sseClients[userId][connId]
headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
**_CORS_HEADERS,
}
response = web.StreamResponse(status=200, headers=headers)
await response.prepare(request)
# 发送连接确认
connectData = {"userId": userId, "connId": connId, "name": user.get("name", "")}
connectPayload = f"event: connected\ndata: {json.dumps(connectData, ensure_ascii=False)}\n\n"
await response.write(connectPayload.encode("utf-8"))
logger.info("[DeepviewSSE] User %s (%s) connected via connId=%s",
user.get("name", "?"), userId[:8], connId)
try:
while True:
try:
payload = await asyncio.wait_for(q.get(), timeout=30.0)
except asyncio.TimeoutError:
# 心跳保活
await response.write(b": heartbeat\n\n")
continue
if payload is None:
break # 断开信号
await response.write(payload.encode("utf-8"))
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError):
logger.info("[DeepviewSSE] User %s connId=%s disconnected (connection lost)",
userId[:8], connId)
finally:
# ★ 只清理该 connId不影响同用户其他标签页
if userId in self._sseClients:
self._sseClients[userId].pop(connId, None)
if not self._sseClients[userId]:
del self._sseClients[userId]
logger.info("[DeepviewSSE] connId=%s cleaned up (user %s remaining conns: %d)",
connId, userId[:8],
len(self._sseClients.get(userId, {})))
return response
async def _handleChat(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/chat
Body: { chatId, text, contextId? }
Auth: Authorization: Bearer <MindPass JWT>
三戒律第二条HTTP 只确认收到。业务结果通过 SSE 推送。
注意userId 从 JWT 中提取,不信任前端传入。
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
# ★ userId 从 JWT 中获取,不信任前端传入
userId = user["userId"]
chatId = body.get("chatId", "").strip()
text = body.get("text", "").strip()
contextId = body.get("contextId", "deepview")
doctorId = body.get("doctorId", "doc_001")
if not chatId or not text:
return web.json_response({"error": "Missing chatId or text"},
status=400, headers=_CORS_HEADERS)
# 立即返回 {received: true},不等待 AI
asyncio.create_task(self._runChat(userId, chatId, text, contextId, doctorId))
return web.json_response(
{"received": True},
headers=_CORS_HEADERS,
)
async def _runChat(self, userId: str, chatId: str, text: str, contextId: str, doctorId: str = "doc_001") -> None:
"""
在后台执行 Hermes AIAgent 对话 (双上下文包路由)。
全部结果通过 SSE 推送,不通过 HTTP 返回。
"""
try:
from run_agent import AIAgent
from hermes_state import SessionDB
db = SessionDB()
loop = asyncio.get_event_loop()
# 从环境变量获取存储根目录,并获取用户专属沙箱与机构知识域
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
userDir = self._getUserStorageDir(userId)
# 解析 orgId
orgId = "org_001"
orgDir = self._getOrgStorageDir(orgId)
platformDir = os.path.join(storageDir, "platform")
# 1. 加载唯一的 SKILL.md (上下文无关版)
skillPath = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"skills", "deepview_assistant", "SKILL.md",
)
systemPrompt = "你是深维面诊智能军师。\n\n"
if os.path.exists(skillPath):
with open(skillPath, "r", encoding="utf-8") as f:
systemPrompt = f.read()
systemPrompt = systemPrompt.replace("{{STORAGE_DIR}}", storageDir)
# 2. 核心架构逻辑:根据 contextId 前缀路由上下文包
if contextId.startswith("recording:"):
recordingId = contextId.split(":", 1)[1]
# 在物理部署中,应该查找 recording 属于哪个 client这里假设我们能推断或传进来
# 为了简便演示,假设客户端通过 clientId:recordingId 格式,或者通过 DB 查得
parts = recordingId.split("/")
clientId = parts[0] if len(parts) > 1 else "unknown_client"
asrId = parts[-1]
asrPath = f"{userDir}/clients/{clientId}/history/{asrId}.md"
systemPrompt += f"""
## 🎙️ 当前上下文模式:单次面诊录音复盘
- 录音 ASR 文件:{asrPath}
- 医生风格档案:{userDir}/doctor_profile.md
- 企业知识库目录:{orgDir}/wiki/
- 平台规则参考:{platformDir}/wiki/
### 你的工作重心
基于这一次面诊录音,分析信任断点、沟通体征、改进建议。
不要读取该客户的完整 profile.md那是全景档案模式的职责
"""
elif contextId.startswith("client:"):
clientId = contextId.split(":", 1)[1]
systemPrompt += f"""
## 👤 当前上下文模式:客户全景档案
- 客户档案目录:{userDir}/clients/{clientId}/
- 核心档案:{userDir}/clients/{clientId}/profile.md
- 历史录音目录:{userDir}/clients/{clientId}/history/
- 医生风格档案:{userDir}/doctor_profile.md
- 企业知识库目录:{orgDir}/wiki/
- 平台规则参考:{platformDir}/wiki/
### 你的工作重心
基于该客户的全生命周期数据,提供诊前策略、社交杠杆分析、跨品类破冰方案。
优先读取 profile.md 获取全景概览,必要时深入 history/ 追溯原始录音细节。
"""
else:
systemPrompt += f"\n\n## 通用模式\n企业知识库目录:{orgDir}/wiki/\n平台规则参考:{platformDir}/wiki/\n"
# 3. 加载上下文履历,实现真正的 Stateful Context
history = db.get_messages_as_conversation(chatId)
agent = AIAgent(
model=os.getenv("DEEPVIEW_MODEL", "gemini-pro-vertex"),
enabled_toolsets=["file"], # 严格按 SPEC 仅给文件权限
stream_delta_callback=self._makeStreamDeltaCallback(userId, chatId, loop),
tool_progress_callback=self._makeToolProgressCallback(userId, chatId, loop),
quiet_mode=True,
platform="deepview",
session_id=chatId,
session_db=db,
user_id=userId,
)
result = await loop.run_in_executor(
None,
lambda: agent.run_conversation(
user_message=text,
system_message=systemPrompt,
conversation_history=history,
),
)
finalResponse = result.get("final_response", "")
if not finalResponse:
finalResponse = result.get("error", "(未生成回答)")
# agent:done — 标记对话完成
self._pushEvent(userId, "agent:done", {
"chatId": chatId,
"fullAnswer": finalResponse,
})
except Exception as e:
logger.error("[DeepviewSSE] Chat error for %s/%s: %s", userId, chatId, e, exc_info=True)
self._pushEvent(userId, "agent:error", {
"chatId": chatId,
"message": f"处理失败:{str(e)}",
})
async def _handleReportGenerate(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/report/generate
Body: { contextId, doctorId? }
同步生成 JSON 报告,包含最多 3 次重试逻辑。
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
import uuid
userId = user["userId"]
contextId = body.get("contextId", "")
doctorId = body.get("doctorId", "doc_001")
presetReportId = body.get("presetReportId")
if not contextId:
return web.json_response({"error": "Missing contextId"}, status=400, headers=_CORS_HEADERS)
userObj = await self._extractUser(request)
userId = userObj.get("sub", "unknown") if userObj else "unknown"
userDir = self._getUserStorageDir(userId)
# 解析 orgId
orgId = userObj.get("org", "org_001") if userObj else "org_001"
orgDir = self._getOrgStorageDir(orgId)
storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage"))
platformDir = os.path.join(storageDir, "platform")
# 构建基础 Prompt
systemPrompt = "你是深维面诊智能军师。\n"
skillPath = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"skills", "deepview_assistant", "SKILL.md",
)
if os.path.exists(skillPath):
with open(skillPath, "r", encoding="utf-8") as f:
systemPrompt = f.read()
systemPrompt = systemPrompt.replace("{{STORAGE_DIR}}", storageDir)
# 兼容一下如果 prompt 里有企业知识库占位
# (因为 V3 里移除了,如果有人加回来,这里替换)
systemPrompt = systemPrompt.replace(f"{storageDir}/wiki/", f"{orgDir}/wiki/")
systemPrompt += f"\n\n## 规则遵循\n企业知识库:{orgDir}/wiki/\n平台规则:{platformDir}/wiki/\n"
if contextId.startswith("recording:"):
recordingId = contextId.split(":", 1)[1]
# Inbox 模式ASR 文件在 inbox/{reportId}/ 下
# 也兼容旧格式 recording:clientId/asrId
parts = recordingId.split("/")
if len(parts) > 1:
# 旧格式已归档recording:clientId/asrId
clientId = parts[0]
asrId = parts[-1]
asrPath = f"{userDir}/clients/{clientId}/history/{asrId}.md"
clientProfilePath = f"{userDir}/clients/{clientId}/profile.md"
else:
# 新格式Inboxrecording:asrId
asrId = parts[0]
asrPath = f"{userDir}/inbox/{asrId}/asr.md"
clientProfilePath = None
systemPrompt += f"\n## 🎙️ 第1模式单条面诊录音复盘\n你的唯一任务是提取基于这通录音的战术复盘。\n录音路径:{asrPath}\n医生档案参考:{userDir}/doctor_profile.md"
if clientProfilePath and os.path.exists(clientProfilePath):
systemPrompt += f"\n客户档案参考:{clientProfilePath}"
systemPrompt += f"\n\n🚨 【系统硬约束Speaker 天然推断指令】"
systemPrompt += f"\n原生录音 ASR 中仅含有 Speaker_1、Speaker_2 等无感情编号。请根据对话上下文(谁在做专业科普与处方,谁在表述容貌焦虑与顾虑)自然反演出真实的医生和客户对应关系。"
systemPrompt += f"\n在输出所有报告正文时,请直接使用真实姓名,彻底抹除 Speaker_X 痕迹。"
else:
clientId = contextId.split(":", 1)[1] if ":" in contextId else contextId
systemPrompt += f"\n## 👤 第2模式客户全景档案战略\n你的唯一任务是生成该客户的全景战略洞察报告。\n档案路径:{userDir}/clients/{clientId}/profile.md\n历史录音目录:{userDir}/clients/{clientId}/history/\n医生风格:{userDir}/doctor_profile.md"
# ── Stage 1 内容引擎: Hermes AIAgent (md2md) ──
# 只负责生成富文本分析报告,不管格式
contentChecklist = """
## 输出必须包含的章节(缺一不可)
1. 方案接纳与信任转化评估(核心结果 + 信任指数百分制 + AI洞察段落
2. 面诊体征数据(医患说话时间比 + 降维科普能力评价并引用比喻原句 + 信任温度曲线各阶段描述)
3. 信任断点溯源(精确到录音时间戳 + 原声对话还原 + 核心病灶诊断标签 + 深度改进分析段落)
4. D.E.E.P. 雷达评估D诊断/E1共情/E2科普/P处方 四维1-5分评分及各维度鉴定说明 + 以军师口吻给医生的犀利忠告)
5. 行动处方(外部行动轨道:含节点名/动作/话术/目的 + 内部觉察轨道:含改变/动作/话术)
6. 报告编号(DW-AMXG-日期)、客户姓名、面诊医生、核心诉求、最终接纳度、面诊时长秒数
"""
systemPrompt += contentChecklist
from run_agent import AIAgent
from hermes_state import SessionDB
db = SessionDB()
loop = asyncio.get_event_loop()
try:
agent = AIAgent(
model=os.getenv("DEEPVIEW_MODEL", "gemini-pro-vertex"),
enabled_toolsets=["file"],
quiet_mode=True,
platform="deepview",
session_id=str(uuid.uuid4()),
session_db=db,
user_id=userId,
)
result = await loop.run_in_executor(
None,
lambda: agent.run_conversation(
user_message="请根据给定的上下文文件生成完整的面诊沟通X光片分析报告。按照章节 checklist 确保不遗漏任何模块。",
system_message=systemPrompt,
),
)
mdReport = result.get("final_response", "").strip()
logger.info(f"[DeepviewSSE] Stage 1 (Hermes md2md) done, length={len(mdReport)} chars")
if not mdReport or len(mdReport) < 100:
return web.json_response({"error": "Stage 1 报告内容为空"}, status=500, headers=_CORS_HEADERS)
except Exception as e:
logger.error(f"[DeepviewSSE] Stage 1 error: {e}")
return web.json_response({"error": f"Stage 1 failed: {e}"}, status=500, headers=_CORS_HEADERS)
# ── Stage 2 格式引擎: qwen3-plus via LiteLLM (JSON Schema 硬约束) ──
# 纯转换任务,不做内容创造,只做结构对齐
jsonSchema = {
"type": "object",
"properties": {
"patientName": {"type": "string"},
"coreDemand": {"type": "string"},
"acceptance": {"type": "string"},
"duration": {"type": "number"},
"xray": {
"type": "object",
"properties": {
"module1": {
"type": "object",
"properties": {
"result": {"type": "string"},
"trustIndex": {"type": "number"},
"coreInsight": {"type": "string"}
},
"required": ["result", "trustIndex", "coreInsight"]
},
"module2": {
"type": "object",
"properties": {
"ratio": {"type": "string"},
"metaphorScore": {"type": "string"},
"curveDesc": {"type": "array", "items": {"type": "string"}}
},
"required": ["ratio", "metaphorScore", "curveDesc"]
},
"module3": {
"type": "object",
"properties": {
"breakpoints": {
"type": "array",
"items": {
"type": "object",
"properties": {
"timestamp": {"type": "string"},
"contextText": {"type": "array", "items": {"type": "string"}},
"diagnosis": {"type": "string"}
},
"required": ["timestamp", "contextText", "diagnosis"]
}
},
"benchmark": {"type": "string"}
},
"required": ["breakpoints", "benchmark"]
},
"module4": {
"type": "object",
"properties": {
"radar": {
"type": "object",
"properties": {
"d": {"type": "number"},
"d_comment": {"type": "string"},
"e1": {"type": "number"},
"e1_comment": {"type": "string"},
"e2": {"type": "number"},
"e2_comment": {"type": "string"},
"p": {"type": "number"},
"p_comment": {"type": "string"}
},
"required": ["d", "d_comment", "e1", "e1_comment", "e2", "e2_comment", "p", "p_comment"]
},
"expertAdvice": {"type": "string"}
},
"required": ["radar", "expertAdvice"]
},
"module5": {
"type": "object",
"properties": {
"track1": {
"type": "array",
"items": {
"type": "object",
"properties": {
"node": {"type": "string"},
"action": {"type": "string"},
"strategy": {"type": "string"},
"purpose": {"type": "string"}
},
"required": ["node", "action", "strategy", "purpose"]
}
},
"track2": {
"type": "array",
"items": {
"type": "object",
"properties": {
"change": {"type": "string"},
"action": {"type": "string"},
"strategy": {"type": "string"}
},
"required": ["change", "action", "strategy"]
}
}
},
"required": ["track1", "track2"]
}
},
"required": ["module1", "module2", "module3", "module4", "module5"]
}
},
"required": ["patientName", "coreDemand", "acceptance", "duration", "xray"]
}
try:
from openai import OpenAI
litellmClient = OpenAI(
base_url=os.getenv("GEMINI_BASE_URL", "http://127.0.0.1:4000/v1"),
api_key=os.getenv("GEMINI_API_KEY", "sk-placeholder"),
)
stage2Response = await loop.run_in_executor(
None,
lambda: litellmClient.chat.completions.create(
model=os.getenv("DEEPVIEW_STAGE2_MODEL", "qwen-plus"),
messages=[
{"role": "system", "content": """你是一个 JSON 格式转换器。将用户提供的面诊分析报告精确转换为以下 JSON 结构。
忠实保留原文内容,不要增删改。必须严格使用以下字段名(不要用中文或其他名称)。
【类型约束·极其重要】
- duration: 纯整数(秒数),如 1800不要带单位
- trustIndex: 纯整数 0-100如 65不要带%
- radar 里的 d/e1/e2/p: 纯浮点数 1.0-5.0,如 4.5,绝不能写成"4.5/5 分""4.5分"
顶层: reportCode, patientName, doctorName, coreDemand, acceptance, duration, xray
xray.module1: result, trustIndex, coreInsight
xray.module2: ratio, metaphorScore, curveDesc(字符串数组)
xray.module3: breakpoints(数组,每项含timestamp/contextText数组/diagnosis), benchmark
xray.module4: radar(含d/d_comment/e1/e1_comment/e2/e2_comment/p/p_comment), expertAdvice
xray.module5: track1(数组,每项含node/action/strategy/purpose), track2(数组,每项含change/action/strategy)"""},
{"role": "user", "content": mdReport}
],
response_format={"type": "json_object"},
max_tokens=8192,
user=userId,
extra_body={
"metadata": {
"deepview_task": contextId,
"deepview_stage": "stage2_json",
"deepview_user": userId,
}
},
)
)
jsonOutput = stage2Response.choices[0].message.content.strip()
parsedData = json.loads(jsonOutput)
logger.info(f"[DeepviewSSE] Stage 2 (json_object) done, keys={list(parsedData.keys())}")
xray = parsedData.get('xray', {})
for mk in ['module1', 'module2', 'module3', 'module4', 'module5']:
logger.info(f"[DeepviewSSE] xray.{mk}: {'' if xray.get(mk) else '❌ MISSING'}")
# ── 持久化:写入 DBhermes state.db──
import uuid
reportId = presetReportId if presetReportId else "rep_" + uuid.uuid4().hex[:8]
# Programmatically inject the real report identifier instead of hallucinated one
parsedData['reportCode'] = f"DW-AMXG-{reportId[4:].upper()}"
parsedData['id'] = reportId
parsedData['context_id'] = contextId
try:
from hermes_state import SessionDB
reportDb = SessionDB()
reportJson = json.dumps(parsedData, ensure_ascii=False)
with reportDb._lock:
if presetReportId:
reportDb._conn.execute(
"UPDATE deepview_reports_v2 SET status='completed', report_json=?, created_at=? WHERE report_id=?",
(reportJson, time.time(), presetReportId)
)
else:
reportDb._conn.execute(
"INSERT OR REPLACE INTO deepview_reports_v2 (report_id, context_id, user_id, report_json, created_at, status) VALUES (?, ?, ?, ?, ?, 'completed')",
(reportId, contextId, userId, reportJson, time.time())
)
reportDb._conn.commit()
logger.info(f"[DeepviewSSE] Report persisted to DB: {contextId} -> {reportId}")
except Exception as dbErr:
logger.error(f"[DeepviewSSE] DB persist failed: {dbErr}")
safeBody = json.dumps({"success": True, "reportId": reportId, "data": parsedData}, ensure_ascii=False)
return web.Response(text=safeBody, content_type="application/json", headers=_CORS_HEADERS)
except Exception as e:
logger.error(f"[DeepviewSSE] Stage 2 error: {e}")
return web.json_response({"error": f"Stage 2 (json conversion) failed: {e}"}, status=500, headers=_CORS_HEADERS)
async def _handleReportGet(self, request: "web.Request") -> "web.Response":
"""
GET /deepview/report/get?reportId=rep_xxx
从 DB 读取已持久化的报告 JSON。
"""
if request.method == "OPTIONS":
return web.Response(status=200, headers=_CORS_HEADERS)
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
reportId = request.query.get("reportId", "")
if not reportId:
return web.json_response({"error": "Missing reportId"}, status=400, headers=_CORS_HEADERS)
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
cursor = db._conn.execute(
"SELECT report_json, created_at, client_id FROM deepview_reports_v2 WHERE report_id = ?",
(reportId,)
)
row = cursor.fetchone()
if not row:
return web.json_response({"error": "Report not found", "reportId": reportId}, status=404, headers=_CORS_HEADERS)
reportData = json.loads(row[0])
clientId = row[2]
# Hot patch reportCode for older generated reports that might have hallucinated ones
reportData["reportCode"] = f"DW-AMXG-{reportId[4:].upper()}"
reportData["id"] = reportId
# Attach clientId into the data payload so the frontend knows if it's archived
if clientId:
reportData["clientId"] = clientId
safeBody = json.dumps({"success": True, "data": reportData}, ensure_ascii=False)
return web.Response(text=safeBody, content_type="application/json", headers=_CORS_HEADERS)
except Exception as e:
logger.error(f"[DeepviewSSE] Report get error: {e}")
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
async def _handleReportsList(self, request: "web.Request") -> "web.Response":
"""
GET /deepview/reports/list
返回当前用户所有的面诊报告,按时间倒序排列。包含生成中的占位记录。
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
cursor = db._conn.execute(
"SELECT report_id, context_id, report_json, created_at, status, client_id FROM deepview_reports_v2 WHERE user_id = ? ORDER BY created_at DESC",
(user["userId"],)
)
rows = cursor.fetchall()
reports = []
for row in rows:
rep_id, ctx_id, r_json, c_time, status, client_id = row
try:
data = json.loads(r_json)
except:
data = {}
reports.append({
"id": rep_id,
"contextId": ctx_id,
"status": status,
"createdAt": c_time,
"clientId": client_id,
"data": data
})
return web.json_response({"success": True, "reports": reports}, headers=_CORS_HEADERS)
except Exception as e:
logger.error(f"[DeepviewSSE] Reports list error: {e}")
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
async def _handleMaterialsUploadToken(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/materials/upload_token
Get a pre-signed URL for direct OSS upload.
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except:
body = {}
filename = body.get("filename", "unknown.pdf")
import os
ext = os.path.splitext(filename)[1].lower()
import uuid
file_uid = uuid.uuid4().hex
object_key = f"deepview-raw/{user['userId']}/{file_uid}{ext}"
oss_ak = os.getenv("ALIYUN_ACCESS_KEY_ID", "")
oss_sk = os.getenv("ALIYUN_ACCESS_KEY_SECRET", "")
oss_bucket = os.getenv("OSS_BUCKET", "meetings-dev")
oss_endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com")
if not all([oss_ak, oss_sk]):
# 降级:如果服务器没配 OSS 参数,或者本地测试,可能直接失败
return web.json_response({"error": "Server missing OSS credentials (ALIYUN_ACCESS_KEY_ID)"}, status=500, headers=_CORS_HEADERS)
import oss2
auth = oss2.Auth(oss_ak, oss_sk)
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
# 必须显式指定 Content-Type 来生成签名,否则前端带默认 MIME type 会导致 OSS 拒签 403
headers = {'Content-Type': 'application/octet-stream'}
put_url = bucket.sign_url("PUT", object_key, 3600, headers=headers)
if put_url.startswith("http://"):
put_url = put_url.replace("http://", "https://", 1)
return web.json_response({
"putUrl": put_url,
"ossKey": object_key,
"filename": filename
}, headers=_CORS_HEADERS)
async def _handleMaterialsConfirm(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/materials/confirm
Frontend calls this after OSS PUT succeeds.
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except:
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
oss_key = body.get("ossKey")
context_id = body.get("contextId", "deepview")
filename = body.get("filename", "unknown.pdf")
if not oss_key:
return web.json_response({"error": "Missing ossKey"}, status=400, headers=_CORS_HEADERS)
# Offload to background task
try:
from .deepview_materials import process_uploaded_material_oss
except ImportError:
from gateway.platforms.deepview_materials import process_uploaded_material_oss
import uuid
import time
report_id = "rep_" + uuid.uuid4().hex[:8]
# INSERT placeholder INTO deepview_reports_v2
if context_id.startswith("recording:"):
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
db._conn.execute(
"INSERT INTO deepview_reports_v2 (report_id, context_id, user_id, report_json, created_at, status) VALUES (?, ?, ?, '{}', ?, 'processing')",
(report_id, context_id, user["userId"], time.time())
)
db._conn.commit()
except Exception as e:
logger.error(f"Failed to create processing report placeholder: {e}")
async def _wrapper():
# 1. ASR stage
await process_uploaded_material_oss(
oss_key=oss_key,
original_filename=filename,
context_id=context_id,
push_event_fn=self._pushEvent,
user_id=user["userId"],
org_id=user.get("org", "org_001")
)
# 2. X-ray stage (Auto orchestrate)
if context_id.startswith("recording:"):
import sys, os
try:
port = int(os.environ.get('PORT', 8645))
import aiohttp
async with aiohttp.ClientSession() as session:
url = f"http://127.0.0.1:{port}/deepview/report/generate"
headers = {"Authorization": request.headers.get("Authorization", "")}
logger.info(f"[DeepviewSSE] Auto-triggering report generation internally for {context_id} -> {report_id}")
async with session.post(url, json={"contextId": context_id, "doctorId": "doc_001", "presetReportId": report_id}, headers=headers) as resp:
res_json = await resp.json()
if res_json.get("success"):
self._pushEvent(user["userId"], "report:ready", {"reportId": report_id})
else:
logger.error(f"[DeepviewSSE] Internal generate failed: {res_json}")
# Mark as failed in DB
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
db._conn.execute("UPDATE deepview_reports_v2 SET status='failed' WHERE report_id=?", (report_id,))
db._conn.commit()
except Exception as e:
logger.error(f"[DeepviewSSE] Wrapper pipeline failed: {e}")
asyncio.create_task(_wrapper())
return web.json_response({"received": True, "reportId": report_id if context_id.startswith("recording:") else None}, headers=_CORS_HEADERS)
# ──────────────────────────────────────────────
# 会话列表 API
# ──────────────────────────────────────────────
async def _handleSessionsList(self, request: "web.Request") -> "web.Response":
"""
GET /deepview/sessions/list
从 SessionDB 查询当前用户的所有 deepview 会话。
返回 {sessions: [{id, title, startedAt, preview, messageCount}]}
★ 自动迁移:首次真实登录时,将 dev_user_001 和 NULL 的旧记录归属到当前用户
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
userId = user["userId"]
try:
from hermes_state import SessionDB
db = SessionDB()
# ★ 自动迁移:如果当前是真实 JWT 用户(非 dev
# 将所有 deepview source 且 user_id 为 NULL 或 dev_user_001 的 session 归属给当前用户
if userId != "dev_user_001":
try:
conn = db._conn
with db._lock:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"UPDATE sessions SET user_id = ? "
"WHERE source = 'deepview' AND (user_id IS NULL OR user_id = 'dev_user_001')",
(userId,)
)
migrated = cursor.rowcount
conn.commit()
if migrated > 0:
logger.info("[DeepviewSSE] Auto-migrated %d orphan sessions to userId=%s",
migrated, userId[:8])
except Exception as e:
logger.warning("[DeepviewSSE] Session migration failed: %s", e)
try:
conn.rollback()
except Exception:
pass
# 查询 source="deepview" 的所有会话
rawSessions = db.list_sessions_rich(source="deepview", limit=100)
# 按 user_id 过滤(只返回当前用户的会话)
sessions = []
for s in rawSessions:
# 开发模式或 user_id 匹配
if s.get("user_id") == userId or os.environ.get("DEEPVIEW_AUTH_DEV_MODE", ""):
sessions.append({
"id": s["id"],
"title": s.get("title") or s.get("preview") or "新对话",
"startedAt": s.get("started_at"),
"messageCount": s.get("message_count", 0),
"preview": s.get("preview", ""),
})
return web.json_response({"sessions": sessions}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to list sessions for userId=%s: %s", userId, e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
# ──────────────────────────────────────────────
# 报告持久化
# ──────────────────────────────────────────────
def _initReportsDb(self) -> None:
"""确保 deepview_reports 表存在(复用 Hermes state.db"""
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
db._conn.execute("""
CREATE TABLE IF NOT EXISTS deepview_reports_v2 (
report_id TEXT PRIMARY KEY,
context_id TEXT NOT NULL,
user_id TEXT NOT NULL,
report_json TEXT NOT NULL,
created_at REAL NOT NULL,
status TEXT DEFAULT 'completed',
client_id TEXT DEFAULT NULL
)
""")
cursor = db._conn.execute("PRAGMA table_info(deepview_reports_v2)")
columns = [row[1] for row in cursor.fetchall()]
if 'status' not in columns:
db._conn.execute("ALTER TABLE deepview_reports_v2 ADD COLUMN status TEXT DEFAULT 'completed'")
if 'client_id' not in columns:
db._conn.execute("ALTER TABLE deepview_reports_v2 ADD COLUMN client_id TEXT DEFAULT NULL")
db._conn.execute("""
CREATE INDEX IF NOT EXISTS idx_deepview_reports_v2_user
ON deepview_reports_v2(user_id)
""")
db._conn.commit()
logger.info("[DeepviewSSE] Reports table ready")
except Exception as e:
logger.warning("[DeepviewSSE] Failed to init reports table: %s", e)
# ──────────────────────────────────────────────
# 钉选板持久化
# ──────────────────────────────────────────────
def _initPinsDb(self) -> None:
"""确保 deepview_pinned_items 表存在(复用 Hermes state.db"""
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
db._conn.execute("""
CREATE TABLE IF NOT EXISTS deepview_pinned_items (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
title TEXT,
snippet TEXT,
context_id TEXT DEFAULT 'deepview',
chat_id TEXT,
created_at REAL NOT NULL
)
""")
db._conn.execute("""
CREATE INDEX IF NOT EXISTS idx_deepview_pins_user
ON deepview_pinned_items(user_id, context_id)
""")
db._conn.commit()
logger.info("[DeepviewSSE] Pinned items table ready")
except Exception as e:
logger.warning("[DeepviewSSE] Failed to init pins table: %s", e)
async def _handlePinsList(self, request: "web.Request") -> "web.Response":
"""GET /deepview/pins/list — 获取当前用户的所有钉选项。"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
userId = user["userId"]
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
cursor = db._conn.execute(
"SELECT id, title, snippet, context_id, chat_id, created_at "
"FROM deepview_pinned_items WHERE user_id = ? ORDER BY created_at DESC",
(userId,)
)
rows = cursor.fetchall()
pins = []
for r in rows:
row = dict(r)
# 格式化日期为 MM-DD
import datetime
dt = datetime.datetime.fromtimestamp(row["created_at"])
dateStr = f"{dt.month:02d}-{dt.day:02d}"
pins.append({
"id": row["id"],
"title": row["title"] or "提取结论",
"snippet": row["snippet"] or "",
"date": dateStr,
"contextId": row["context_id"],
"chatId": row["chat_id"] or "",
})
return web.json_response({"pins": pins}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to list pins: %s", e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
async def _handlePinsAdd(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/pins/add
Body: { id, title, snippet, contextId, chatId }
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
userId = user["userId"]
pinId = body.get("id", f"pin_{int(time.time())}")
title = body.get("title", "提取结论")
snippet = body.get("snippet", "")
contextId = body.get("contextId", "deepview")
chatId = body.get("chatId", "")
try:
from hermes_state import SessionDB
db = SessionDB()
# 去重:同 user + context + title + snippet 不重复插入
with db._lock:
existing = db._conn.execute(
"SELECT id FROM deepview_pinned_items "
"WHERE user_id = ? AND context_id = ? AND title = ? AND snippet = ?",
(userId, contextId, title, snippet)
).fetchone()
if existing:
return web.json_response({"duplicate": True, "id": existing["id"]}, headers=_CORS_HEADERS)
def _do(conn):
conn.execute(
"INSERT INTO deepview_pinned_items (id, user_id, title, snippet, context_id, chat_id, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(pinId, userId, title, snippet, contextId, chatId, time.time())
)
db._execute_write(_do)
return web.json_response({"success": True, "id": pinId}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to add pin: %s", e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
async def _handlePinsRemove(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/pins/remove
Body: { id }
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
userId = user["userId"]
pinId = body.get("id", "")
if not pinId:
return web.json_response({"error": "Missing id"}, status=400, headers=_CORS_HEADERS)
try:
from hermes_state import SessionDB
db = SessionDB()
def _do(conn):
conn.execute(
"DELETE FROM deepview_pinned_items WHERE id = ? AND user_id = ?",
(pinId, userId)
)
db._execute_write(_do)
return web.json_response({"success": True}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to remove pin: %s", e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
# ──────────────────────────────────────────────
# 素材文件持久化
# ──────────────────────────────────────────────
# ★ 驨总基础知识库文件(预填充到 DB统一管理
_DEEPVIEW_BASE_FILES = [
("xz_0", "00_前言.md", "deepview"),
("xz_1", "01_第一章 品牌机会论证.md", "deepview"),
("xz_2", "02_第二章 原点市场与产品概念.md", "deepview"),
("xz_3", "03_第三章 品牌九要素之语言表达.md", "deepview"),
("xz_4", "04_第四章 品牌九要素之视觉表达.md", "deepview"),
("xz_5", "05_第五章 产品设计、广告与公关建品牌.md", "deepview"),
("xz_6", "06_第六章 新产品上市之品牌试错期.md", "deepview"),
("xz_7", "07_第七章 品牌的增长.md", "deepview"),
("xz_8", "08_第八章 连锁合作谈判.md", "deepview"),
("xz_9", "09_第九章 打赢商战.md", "deepview"),
]
def _initMaterialsDb(self) -> None:
"""确保 deepview_materials 表存在并预填基础文件。"""
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
db._conn.execute("""
CREATE TABLE IF NOT EXISTS deepview_materials (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
context_id TEXT DEFAULT 'deepview',
user_id TEXT,
source TEXT DEFAULT 'base',
created_at REAL NOT NULL
)
""")
db._conn.execute("""
CREATE INDEX IF NOT EXISTS idx_deepview_materials_ctx
ON deepview_materials(context_id)
""")
# 预填基础知识库文件
for fid, fname, ctx in self._DEEPVIEW_BASE_FILES:
db._conn.execute(
"INSERT OR IGNORE INTO deepview_materials (id, filename, context_id, source, created_at) "
"VALUES (?, ?, ?, 'base', ?)",
(fid, fname, ctx, 0) # created_at=0 表示基础文件
)
db._conn.commit()
logger.info("[DeepviewSSE] Materials table ready")
except Exception as e:
logger.warning("[DeepviewSSE] Failed to init materials table: %s", e)
async def _handleMaterialsList(self, request: "web.Request") -> "web.Response":
"""
GET /deepview/materials/list?contextId=deepview
返回指定 context 下的所有素材文件(基础 + 用户上传)。
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
contextId = request.query.get("contextId", "deepview")
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
cursor = db._conn.execute(
"SELECT id, filename, context_id, source "
"FROM deepview_materials WHERE context_id = ? ORDER BY created_at ASC",
(contextId,)
)
rows = cursor.fetchall()
files = []
for r in rows:
row = dict(r)
files.append({
"id": row["id"],
"name": row["filename"],
"projectId": row["context_id"],
"source": row["source"],
})
return web.json_response({"files": files}, headers=_CORS_HEADERS)
except Exception as e:
logger.error("Failed to list materials: %s", e)
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
# ──────────────────────────────────────────────
# 客户信息 API (Phase 4)
# ──────────────────────────────────────────────
async def _handleClientsList(self, request: "web.Request") -> "web.Response":
"""
GET /deepview/clients/list
返回所有客户记录列表。扫描 storage/clients 目录。
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
clientsDir = os.path.join(userDir, "clients")
clients = []
if os.path.exists(clientsDir):
for d in os.listdir(clientsDir):
clientPath = os.path.join(clientsDir, d)
if os.path.isdir(clientPath):
# 尝试读取 profile.md 以获取首行做名字(可选优化)
# 默认使用目录名作为 ID 和名字
clients.append({
"id": d,
"name": d,
"hasProfile": os.path.exists(os.path.join(clientPath, "profile.md"))
})
return web.json_response({"clients": clients}, headers=_CORS_HEADERS)
async def _handleClientCreate(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/clients/create
动态建档
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except:
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
name = body.get("name", "").strip()
phone = body.get("phone", "").strip()
if not name:
return web.json_response({"error": "Name is required"}, status=400, headers=_CORS_HEADERS)
import uuid
import time
clientId = f"p_{str(uuid.uuid4())[:8]}"
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
clientDir = os.path.join(userDir, "clients", clientId)
os.makedirs(clientDir, exist_ok=True)
profileData = f"# 客户基本档案\n姓名:{name}\n手机号/尾号:{phone}\n建档时间:{time.strftime('%Y-%m-%d')}\n"
with open(os.path.join(clientDir, "profile.md"), "w", encoding="utf-8") as f:
f.write(profileData)
return web.json_response({
"id": clientId,
"name": name,
"phone": phone
}, headers=_CORS_HEADERS)
async def _handleClientProfile(self, request: "web.Request") -> "web.Response":
"""
GET /deepview/clients/{id}/profile
读取并返回某个客户的 profile.md。
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
clientId = request.match_info.get("id", "")
if not clientId:
return web.json_response({"error": "Missing client ID"}, status=400, headers=_CORS_HEADERS)
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
profilePath = os.path.join(userDir, "clients", clientId, "profile.md")
content = ""
if os.path.exists(profilePath):
with open(profilePath, "r", encoding="utf-8") as f:
content = f.read()
return web.json_response({
"id": clientId,
"profileContent": content
}, headers=_CORS_HEADERS)
# ──────────────────────────────────────────────
# 报告归档 (Inbox → Client)
# ──────────────────────────────────────────────
async def _handleReportArchive(self, request: "web.Request") -> "web.Response":
"""
POST /deepview/reports/archive
Body: { reportId, clientId }
将 inbox 中的报告物理迁移到 clients/{clientId}/history/ 并更新 DB。
"""
user = await self._extractUser(request)
if not user:
return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS)
try:
body = await request.json()
except:
return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS)
reportId = body.get("reportId", "").strip()
clientId = body.get("clientId", "").strip()
if not reportId or not clientId:
return web.json_response({"error": "Missing reportId or clientId"}, status=400, headers=_CORS_HEADERS)
userId = user["userId"]
userDir = self._getUserStorageDir(user.get("sub", "unknown"))
# 1. 物理文件迁移users/{userId}/inbox/{reportId}/ → users/{userId}/clients/{clientId}/history/{reportId}/
import shutil
inboxDir = os.path.join(userDir, "inbox", reportId)
targetDir = os.path.join(userDir, "clients", clientId, "history", reportId)
if os.path.exists(inboxDir):
os.makedirs(os.path.dirname(targetDir), exist_ok=True)
shutil.move(inboxDir, targetDir)
logger.info(f"[DeepviewSSE] Archived {inboxDir}{targetDir}")
# 2. DB 更新:标记 client_id + 更新 context_id
try:
from hermes_state import SessionDB
db = SessionDB()
with db._lock:
db._conn.execute(
"UPDATE deepview_reports_v2 SET client_id=?, context_id=? WHERE report_id=? AND user_id=?",
(clientId, f"recording:{clientId}/{reportId}", reportId, userId)
)
db._conn.commit()
logger.info(f"[DeepviewSSE] DB archived: {reportId} → client {clientId}")
except Exception as e:
logger.error(f"[DeepviewSSE] Archive DB update failed: {e}")
return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS)
return web.json_response({"success": True, "reportId": reportId, "clientId": clientId}, headers=_CORS_HEADERS)
# ──────────────────────────────────────────────
# 服务器生命周期
# ──────────────────────────────────────────────
async def start(self) -> None:
"""启动 aiohttp 服务器。"""
if not AIOHTTP_AVAILABLE:
raise RuntimeError("aiohttp is required for DeepviewSSEServer")
self._app = web.Application()
# 注册路由
self._app.router.add_get("/deepview/health", self._handleHealth)
self._app.router.add_get("/deepview/chat/history", self._handleHistory)
self._app.router.add_get("/deepview/sessions/list", self._handleSessionsList)
self._app.router.add_get("/deepview/events", self._handleSSEConnect)
self._app.router.add_post("/deepview/chat", self._handleChat)
self._app.router.add_post("/deepview/report/generate", self._handleReportGenerate)
self._app.router.add_get("/deepview/report/get", self._handleReportGet)
self._app.router.add_get("/deepview/reports/list", self._handleReportsList)
self._app.router.add_get("/deepview/profile", self._handleProfileGet)
self._app.router.add_post("/deepview/profile", self._handleProfilePost)
self._app.router.add_post("/deepview/materials/upload_token", self._handleMaterialsUploadToken)
self._app.router.add_post("/deepview/materials/confirm", self._handleMaterialsConfirm)
self._app.router.add_get("/deepview/materials/list", self._handleMaterialsList)
self._app.router.add_get("/deepview/pins/list", self._handlePinsList)
self._app.router.add_post("/deepview/pins/add", self._handlePinsAdd)
self._app.router.add_post("/deepview/pins/remove", self._handlePinsRemove)
# 客户信息 APIs
self._app.router.add_get("/deepview/clients/list", self._handleClientsList)
self._app.router.add_post("/deepview/clients/create", self._handleClientCreate)
self._app.router.add_get("/deepview/clients/{id}/profile", self._handleClientProfile)
# 报告归档 API (Inbox → Client)
self._app.router.add_post("/deepview/reports/archive", self._handleReportArchive)
# OPTIONS 预检CORS
self._app.router.add_route("OPTIONS", "/deepview/{path:.*}", self._handleOptions)
self._runner = web.AppRunner(self._app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._host, self._port)
await site.start()
devMode = os.environ.get("DEEPVIEW_AUTH_DEV_MODE", "")
authMode = "dev mode (no auth)" if devMode else f"MindPass delegation ({MINDPASS_BASE_URL})"
logger.info("[DeepviewSSE] Server started on http://%s:%d (auth: %s)",
self._host, self._port, authMode)
async def stop(self) -> None:
"""停止服务器。"""
# 断开所有 SSE 客户端
for userId, conns in list(self._sseClients.items()):
for connId, q in conns.items():
q.put_nowait(None)
self._sseClients.clear()
if self._runner:
await self._runner.cleanup()
logger.info("[DeepviewSSE] Server stopped")
async def _handleOptions(self, request: "web.Request") -> "web.Response":
"""CORS 预检。"""
return web.Response(status=200, headers=_CORS_HEADERS)
# ── 独立启动入口 ──
async def main():
"""直接运行此文件来启动深维 SSE 服务器。"""
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
server = DeepviewSSEServer(
host=os.getenv("DEEPVIEW_SSE_HOST", "127.0.0.1"),
port=int(os.getenv("DEEPVIEW_SSE_PORT", "8643")),
)
await server.start()
# 保持运行
try:
while True:
await asyncio.sleep(3600)
except KeyboardInterrupt:
pass
finally:
await server.stop()
if __name__ == "__main__":
asyncio.run(main())