""" 深维面诊智能体 SSE 桥接适配器 (deepview_sse.py) 独立的 aiohttp HTTP 服务器,专为深维面诊智能体前端提供: - GET /deepview/events — 全局 SSE 事件流(JWT 认证) - POST /deepview/chat — 发起对话(JWT 认证) - GET /deepview/health — 健康检查 认证:复用 MindPass 统一认证体系,JWT sub 字段作为 userId。 隔离:三层模型 — userId (身份) × connId (标签页) × chatId (对话)。 翻译规则(基于 Hermes 引擎源码审计): - stream_delta_callback(text) → SSE agent:chunk - stream_delta_callback(None) → 【丢弃】 - tool_progress("tool.started") → SSE agent:thinking - tool_progress("tool.completed") → 【丢弃】 - tool_progress("reasoning.available") → 【丢弃】 - run_conversation() returns → SSE agent:done - run_conversation() raises → SSE agent:error """ import asyncio import json import logging import os import time import uuid from typing import Any, Dict, Optional try: from aiohttp import web, ClientSession, ClientTimeout AIOHTTP_AVAILABLE = True except ImportError: AIOHTTP_AVAILABLE = False web = None # type: ignore[assignment] ClientSession = None # type: ignore[assignment] logger = logging.getLogger(__name__) # ── MindPass 验证委托(路径 B:不持有密钥,调用 MindPass /api/auth/me 验证 Token) ── MINDPASS_BASE_URL = os.environ.get("MINDPASS_BASE_URL", "http://127.0.0.1:3020") # 内存缓存:避免同一 token 高频重复请求 MindPass(TTL 60s) _tokenCache: Dict[str, tuple] = {} # token -> (userDict, expireTimestamp) _TOKEN_CACHE_TTL = 60 # 秒 async def _verifyTokenAsync(token: str) -> Optional[dict]: """ ★ 路径 B:委托 MindPass 验证 Token。 本服务不持有 JWT 签发密钥,通过 HTTP 调用 MindPass 中央认证服务。 返回 {userId, phone, name, avatarUrl} 或 None。 """ if not token: return None # 开发模式:未配置 MINDPASS_BASE_URL 且不是默认值时,允许 userId 直传 devMode = os.environ.get("DEEPVIEW_AUTH_DEV_MODE", "") if devMode: logger.warning("[DeepviewSSE] Auth dev mode enabled, using token as userId") return {"userId": token, "phone": "", "name": "开发用户"} # 检查内存缓存 now = time.time() cached = _tokenCache.get(token) if cached and cached[1] > now: return cached[0] # 调用 MindPass /api/auth/me try: timeout = ClientTimeout(total=5) async with ClientSession(timeout=timeout) as session: async with session.get( f"{MINDPASS_BASE_URL}/api/auth/me", headers={"Authorization": f"Bearer {token}"}, ) as resp: if resp.status != 200: logger.info("[DeepviewSSE] MindPass returned %d for token verification", resp.status) return None data = await resp.json() user = { "userId": data["id"], "phone": data.get("phone", ""), "name": data.get("name", ""), "avatarUrl": data.get("avatarUrl", ""), } # 缓存结果 _tokenCache[token] = (user, now + _TOKEN_CACHE_TTL) return user except Exception as e: logger.error("[DeepviewSSE] MindPass verification failed: %s", e) return None def _verifyToken(token: str) -> Optional[dict]: """同步兼容包装:在已有 event loop 的上下文中使用 _verifyTokenAsync。""" # 被 _extractUser 中的同步路径调用时的兜底 try: loop = asyncio.get_event_loop() if loop.is_running(): # 在 aiohttp handler 中,应使用 _verifyTokenAsync import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as pool: future = pool.submit(asyncio.run, _verifyTokenAsync(token)) return future.result(timeout=6) else: return loop.run_until_complete(_verifyTokenAsync(token)) except Exception as e: logger.error("[DeepviewSSE] Sync token verify failed: %s", e) return None # ── 工具名 → 用户可读标签 ── _STEP_LABELS = { "read_file": "📖 正在查阅知识库", "search_files": "🔍 正在搜索相关文件", "web_search": "🌐 正在搜索网络", "web_extract": "📄 正在提取网页内容", "terminal": "⚙️ 正在执行命令", "vision_analyze": "👁️ 正在分析图片", "patch": "✏️ 正在修改文件", "write_file": "💾 正在写入文件", } def _translate_step(toolName: str, preview: str) -> str: """将 Hermes 工具名翻译为前端展示文案。""" return _STEP_LABELS.get(toolName, f"🔧 {preview or toolName}") # ── CORS ── _CORS_HEADERS = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization", "Access-Control-Max-Age": "600", } class DeepviewSSEServer: """ 深维面诊智能体前端的专属 SSE 服务器。 职责: 1. 管理多顾问、多标签页的 SSE 长连接 2. 将 Hermes AIAgent 的底层回调翻译为封闭事件集中的 7 个 SSE 事件 3. 提供业务 API(对话、文件上传、钉入结论) 隔离模型: - userId (身份层) ← MindPass JWT sub,决定事件推给谁 - connId (连接层) ← 每个浏览器标签页唯一,同用户多标签页共存 - chatId (会话层) ← 每个对话 Tab 唯一,前端按 chatId 分桶渲染 """ def __init__(self, host: str = "127.0.0.1", port: int = 8643): self._host = host self._port = port self._app: Optional["web.Application"] = None self._runner: Optional["web.AppRunner"] = None # userId → { connId → asyncio.Queue } # 同一用户的多个标签页各有自己的 Queue,事件广播到所有连接 self._sseClients: Dict[str, Dict[str, "asyncio.Queue[Optional[str]]"]] = {} # ★ 初始化钉选板存储 self._initPinsDb() # ★ 初始化素材文件表 self._initMaterialsDb() # ★ 初始化报告持久化表 self._initReportsDb() # ★ 初始化扩展档案表 self._initProfilesDb() # ────────────────────────────────────────────── # 存储辅助方法 # ────────────────────────────────────────────── def _getUserStorageDir(self, userId: str) -> str: """返回当前用户的专属存储沙箱根路径,不存在时自动创建。""" storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage")) userDir = os.path.join(storageDir, "users", userId) os.makedirs(userDir, exist_ok=True) return userDir # ────────────────────────────────────────────── # ────────────────────────────────────────────── def _pushEvent(self, userId: str, eventName: str, data: dict) -> None: """向指定用户的所有 SSE 连接(标签页)推送事件。""" conns = self._sseClients.get(userId) if not conns: logger.debug("SSE push to disconnected userId=%s, event=%s dropped", userId, eventName) return payload = f"event: {eventName}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" for connId, q in list(conns.items()): try: q.put_nowait(payload) except asyncio.QueueFull: logger.warning("SSE queue full: userId=%s connId=%s, dropping event=%s", userId, connId, eventName) def _broadcastEvent(self, eventName: str, data: dict) -> None: """向所有已连接的用户推送事件(用于全局通知如 wiki:updated)。""" for userId in list(self._sseClients.keys()): self._pushEvent(userId, eventName, data) # ────────────────────────────────────────────── # Hermes AIAgent 回调工厂 # ────────────────────────────────────────────── def _makeStreamDeltaCallback(self, userId: str, chatId: str, loop: asyncio.AbstractEventLoop): """ 创建 stream_delta_callback。 翻译规则: - text (非 None) → agent:chunk - None → 丢弃(CLI 显示层哨兵) """ def callback(delta): if delta is None: return # 丢弃 CLI 哨兵信号 loop.call_soon_threadsafe( self._pushEvent, userId, "agent:chunk", { "chatId": chatId, "text": delta, } ) return callback def _makeToolProgressCallback(self, userId: str, chatId: str, loop: asyncio.AbstractEventLoop): """ 创建 tool_progress_callback。 翻译规则: - tool.started → agent:thinking - tool.completed → 丢弃(前端不需要) - reasoning.available → 丢弃(与 stream_delta 内容重复) - _thinking → 丢弃(子代理中继,本项目不涉及) """ def callback(eventType, name, preview=None, args=None, **kwargs): if eventType == "tool.started": # 过滤内部事件 if name and name.startswith("_"): return message = _translate_step(name, preview) loop.call_soon_threadsafe( self._pushEvent, userId, "agent:thinking", { "chatId": chatId, "step": name, "message": message, } ) # tool.completed, reasoning.available, _thinking → 全部丢弃 return callback # ────────────────────────────────────────────── # 认证辅助 # ────────────────────────────────────────────── async def _extractUser(self, request: "web.Request") -> Optional[dict]: """ 从请求中提取用户信息(异步版本,委托 MindPass 验证)。 优先级: 1. Authorization: Bearer 头(HTTP POST 请求用) 2. ?token= 查询参数(EventSource 连接用,因为 EventSource 不支持自定义 Header) """ # 1. Header authHeader = request.headers.get("Authorization", "") if authHeader.startswith("Bearer "): token = authHeader[7:].strip() user = await _verifyTokenAsync(token) if user: return user # 2. Query parameter token = request.query.get("token", "").strip() if token: return await _verifyTokenAsync(token) return None # ────────────────────────────────────────────── # HTTP Handlers # ────────────────────────────────────────────── def _initProfilesDb(self) -> None: """确保 deepview_user_profiles 表存在,用于存储额外的用户档案""" try: from hermes_state import SessionDB db = SessionDB() with db._lock: db._conn.execute(""" CREATE TABLE IF NOT EXISTS deepview_user_profiles ( user_id TEXT PRIMARY KEY, real_name TEXT, company TEXT, role TEXT, voice_url TEXT, updated_at REAL NOT NULL ) """) # Try to add real_name column if it's missing (for upgrade) try: db._conn.execute("ALTER TABLE deepview_user_profiles ADD COLUMN real_name TEXT") except: pass db._conn.commit() except Exception as e: logger.warning("[DeepviewSSE] Failed to init profiles table: %s", e) async def _handleProfileGet(self, request: "web.Request") -> "web.Response": """GET /deepview/profile""" user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: from hermes_state import SessionDB db = SessionDB() with db._lock: row = db._conn.execute( "SELECT real_name, company, role, voice_url FROM deepview_user_profiles WHERE user_id = ?", (user["userId"],) ).fetchone() if row: return web.json_response({ "realName": row[0], "company": row[1], "role": row[2], "voiceUrl": row[3] }, headers=_CORS_HEADERS) else: return web.json_response({ "realName": "", "company": "", "role": "", "voiceUrl": "" }, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to get profile: %s", e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) async def _handleProfilePost(self, request: "web.Request") -> "web.Response": """POST /deepview/profile""" user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: data = await request.json() real_name = data.get("realName", "") company = data.get("company", "") role = data.get("role", "") voice_url = data.get("voiceUrl", "") from hermes_state import SessionDB import time db = SessionDB() with db._lock: db._conn.execute(""" INSERT INTO deepview_user_profiles (user_id, real_name, company, role, voice_url, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET real_name=excluded.real_name, company=excluded.company, role=excluded.role, voice_url=excluded.voice_url, updated_at=excluded.updated_at """, (user["userId"], real_name, company, role, voice_url, time.time())) db._conn.commit() return web.json_response({"success": True}, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to set profile: %s", e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) async def _handleHistory(self, request: "web.Request") -> "web.Response": """GET /deepview/chat/history?chatId=xxx""" user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) chatId = request.query.get("chatId", "").strip() if not chatId: return web.json_response({"error": "chatId required"}, status=400, headers=_CORS_HEADERS) try: from hermes_state import SessionDB db = SessionDB() # SessionDB returns: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] messages = db.get_messages_as_conversation(chatId) return web.json_response({"messages": messages}, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to load history for chatId=%s: %s", chatId, e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) async def _handleHealth(self, request: "web.Request") -> "web.Response": """GET /deepview/health""" clientCount = sum(len(conns) for conns in self._sseClients.values()) userCount = len(self._sseClients) return web.json_response({ "status": "ok", "service": "deepview-sse", "users": userCount, "connections": clientCount, }) async def _handleSSEConnect(self, request: "web.Request") -> "web.StreamResponse": """ GET /deepview/events?token=xxx&connId=yyy 建立 SSE 长连接。前端 GlobalSSEService 调用此端点。 - token: MindPass JWT(通过 query 参数传递,因为 EventSource 不支持自定义 Header) - connId: 前端自动生成的连接标识(区分同用户多标签页) """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401) userId = user["userId"] connId = request.query.get("connId", "").strip() if not connId: connId = f"conn_{int(time.time())}_{uuid.uuid4().hex[:6]}" # 在 userId 下注册该 connId 的 Queue(不影响其他标签页) if userId not in self._sseClients: self._sseClients[userId] = {} self._sseClients[userId][connId] = asyncio.Queue(maxsize=500) q = self._sseClients[userId][connId] headers = { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", **_CORS_HEADERS, } response = web.StreamResponse(status=200, headers=headers) await response.prepare(request) # 发送连接确认 connectData = {"userId": userId, "connId": connId, "name": user.get("name", "")} connectPayload = f"event: connected\ndata: {json.dumps(connectData, ensure_ascii=False)}\n\n" await response.write(connectPayload.encode("utf-8")) logger.info("[DeepviewSSE] User %s (%s) connected via connId=%s", user.get("name", "?"), userId[:8], connId) try: while True: try: payload = await asyncio.wait_for(q.get(), timeout=30.0) except asyncio.TimeoutError: # 心跳保活 await response.write(b": heartbeat\n\n") continue if payload is None: break # 断开信号 await response.write(payload.encode("utf-8")) except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError): logger.info("[DeepviewSSE] User %s connId=%s disconnected (connection lost)", userId[:8], connId) finally: # ★ 只清理该 connId,不影响同用户其他标签页 if userId in self._sseClients: self._sseClients[userId].pop(connId, None) if not self._sseClients[userId]: del self._sseClients[userId] logger.info("[DeepviewSSE] connId=%s cleaned up (user %s remaining conns: %d)", connId, userId[:8], len(self._sseClients.get(userId, {}))) return response async def _handleChat(self, request: "web.Request") -> "web.Response": """ POST /deepview/chat Body: { chatId, text, contextId? } Auth: Authorization: Bearer 三戒律第二条:HTTP 只确认收到。业务结果通过 SSE 推送。 注意:userId 从 JWT 中提取,不信任前端传入。 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS) # ★ userId 从 JWT 中获取,不信任前端传入 userId = user["userId"] chatId = body.get("chatId", "").strip() text = body.get("text", "").strip() contextId = body.get("contextId", "deepview") doctorId = body.get("doctorId", "doc_001") if not chatId or not text: return web.json_response({"error": "Missing chatId or text"}, status=400, headers=_CORS_HEADERS) # 立即返回 {received: true},不等待 AI asyncio.create_task(self._runChat(userId, chatId, text, contextId, doctorId)) return web.json_response( {"received": True}, headers=_CORS_HEADERS, ) async def _runChat(self, userId: str, chatId: str, text: str, contextId: str, doctorId: str = "doc_001") -> None: """ 在后台执行 Hermes AIAgent 对话 (双上下文包路由)。 全部结果通过 SSE 推送,不通过 HTTP 返回。 """ try: from run_agent import AIAgent from hermes_state import SessionDB db = SessionDB() loop = asyncio.get_event_loop() # 此处获取 user 的 userId userObj = await self._extractUser(request) userId = userObj.get("sub", "unknown") if userObj else "unknown" # 从环境变量获取存储根目录,并获取用户专属沙箱 storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage")) userDir = self._getUserStorageDir(userId) # 1. 加载唯一的 SKILL.md (上下文无关版) skillPath = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "skills", "deepview_assistant", "SKILL.md", ) systemPrompt = "你是深维面诊智能军师。\n\n" if os.path.exists(skillPath): with open(skillPath, "r", encoding="utf-8") as f: systemPrompt = f.read() systemPrompt = systemPrompt.replace("{{STORAGE_DIR}}", storageDir) # 2. 核心架构逻辑:根据 contextId 前缀路由上下文包 if contextId.startswith("recording:"): recordingId = contextId.split(":", 1)[1] # 在物理部署中,应该查找 recording 属于哪个 client,这里假设我们能推断或传进来 # 为了简便演示,假设客户端通过 clientId:recordingId 格式,或者通过 DB 查得 parts = recordingId.split("/") clientId = parts[0] if len(parts) > 1 else "unknown_client" asrId = parts[-1] asrPath = f"{userDir}/clients/{clientId}/history/{asrId}.md" systemPrompt += f""" ## 🎙️ 当前上下文模式:单次面诊录音复盘 - 录音 ASR 文件:{asrPath} - 医生风格档案:{userDir}/doctor_profile.md - 企业知识库目录:{storageDir}/wiki/ ### 你的工作重心 基于这一次面诊录音,分析信任断点、沟通体征、改进建议。 不要读取该客户的完整 profile.md(那是全景档案模式的职责)。 """ elif contextId.startswith("client:"): clientId = contextId.split(":", 1)[1] systemPrompt += f""" ## 👤 当前上下文模式:客户全景档案 - 客户档案目录:{userDir}/clients/{clientId}/ - 核心档案:{userDir}/clients/{clientId}/profile.md - 历史录音目录:{userDir}/clients/{clientId}/history/ - 医生风格档案:{userDir}/doctor_profile.md - 企业知识库目录:{storageDir}/wiki/ ### 你的工作重心 基于该客户的全生命周期数据,提供诊前策略、社交杠杆分析、跨品类破冰方案。 优先读取 profile.md 获取全景概览,必要时深入 history/ 追溯原始录音细节。 """ else: systemPrompt += f"\n\n## 通用模式\n企业知识库目录:{storageDir}/wiki/\n" # 3. 加载上下文履历,实现真正的 Stateful Context history = db.get_messages_as_conversation(chatId) agent = AIAgent( model=os.getenv("DEEPVIEW_MODEL", "gemini-pro-vertex"), enabled_toolsets=["file"], # 严格按 SPEC 仅给文件权限 stream_delta_callback=self._makeStreamDeltaCallback(userId, chatId, loop), tool_progress_callback=self._makeToolProgressCallback(userId, chatId, loop), quiet_mode=True, platform="deepview", session_id=chatId, session_db=db, user_id=userId, ) result = await loop.run_in_executor( None, lambda: agent.run_conversation( user_message=text, system_message=systemPrompt, conversation_history=history, ), ) finalResponse = result.get("final_response", "") if not finalResponse: finalResponse = result.get("error", "(未生成回答)") # agent:done — 标记对话完成 self._pushEvent(userId, "agent:done", { "chatId": chatId, "fullAnswer": finalResponse, }) except Exception as e: logger.error("[DeepviewSSE] Chat error for %s/%s: %s", userId, chatId, e, exc_info=True) self._pushEvent(userId, "agent:error", { "chatId": chatId, "message": f"处理失败:{str(e)}", }) async def _handleReportGenerate(self, request: "web.Request") -> "web.Response": """ POST /deepview/report/generate Body: { contextId, doctorId? } 同步生成 JSON 报告,包含最多 3 次重试逻辑。 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS) import uuid userId = user["userId"] contextId = body.get("contextId", "") doctorId = body.get("doctorId", "doc_001") presetReportId = body.get("presetReportId") if not contextId: return web.json_response({"error": "Missing contextId"}, status=400, headers=_CORS_HEADERS) userObj = await self._extractUser(request) userId = userObj.get("sub", "unknown") if userObj else "unknown" userDir = self._getUserStorageDir(userId) storageDir = os.getenv("DEEPVIEW_STORAGE_DIR", os.path.expanduser("~/Downloads/Coding/医生助理智能体/backend/storage")) # 构建基础 Prompt systemPrompt = "你是深维面诊智能军师。\n" skillPath = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "skills", "deepview_assistant", "SKILL.md", ) if os.path.exists(skillPath): with open(skillPath, "r", encoding="utf-8") as f: systemPrompt = f.read() systemPrompt = systemPrompt.replace("{{STORAGE_DIR}}", storageDir) if contextId.startswith("recording:"): recordingId = contextId.split(":", 1)[1] # Inbox 模式:ASR 文件在 inbox/{reportId}/ 下 # 也兼容旧格式 recording:clientId/asrId parts = recordingId.split("/") if len(parts) > 1: # 旧格式(已归档):recording:clientId/asrId clientId = parts[0] asrId = parts[-1] asrPath = f"{userDir}/clients/{clientId}/history/{asrId}.md" clientProfilePath = f"{userDir}/clients/{clientId}/profile.md" else: # 新格式(Inbox):recording:asrId asrId = parts[0] asrPath = f"{userDir}/inbox/{asrId}/asr.md" clientProfilePath = None systemPrompt += f"\n## 🎙️ 第1模式:单条面诊录音复盘\n你的唯一任务是提取基于这通录音的战术复盘。\n录音路径:{asrPath}\n医生档案参考:{userDir}/doctor_profile.md" if clientProfilePath and os.path.exists(clientProfilePath): systemPrompt += f"\n客户档案参考:{clientProfilePath}" systemPrompt += f"\n\n🚨 【系统硬约束:Speaker 天然推断指令】" systemPrompt += f"\n原生录音 ASR 中仅含有 Speaker_1、Speaker_2 等无感情编号。请根据对话上下文(谁在做专业科普与处方,谁在表述容貌焦虑与顾虑)自然反演出真实的医生和客户对应关系。" systemPrompt += f"\n在输出所有报告正文时,请直接使用真实姓名,彻底抹除 Speaker_X 痕迹。" else: clientId = contextId.split(":", 1)[1] if ":" in contextId else contextId systemPrompt += f"\n## 👤 第2模式:客户全景档案战略\n你的唯一任务是生成该客户的全景战略洞察报告。\n档案路径:{userDir}/clients/{clientId}/profile.md\n历史录音目录:{userDir}/clients/{clientId}/history/\n医生风格:{userDir}/doctor_profile.md" # ── Stage 1 内容引擎: Hermes AIAgent (md2md) ── # 只负责生成富文本分析报告,不管格式 contentChecklist = """ ## 输出必须包含的章节(缺一不可) 1. 方案接纳与信任转化评估(核心结果 + 信任指数百分制 + AI洞察段落) 2. 面诊体征数据(医患说话时间比 + 降维科普能力评价并引用比喻原句 + 信任温度曲线各阶段描述) 3. 信任断点溯源(精确到录音时间戳 + 原声对话还原 + 核心病灶诊断标签 + 深度改进分析段落) 4. D.E.E.P. 雷达评估(D诊断/E1共情/E2科普/P处方 四维1-5分评分及各维度鉴定说明 + 以军师口吻给医生的犀利忠告) 5. 行动处方(外部行动轨道:含节点名/动作/话术/目的 + 内部觉察轨道:含改变/动作/话术) 6. 报告编号(DW-AMXG-日期)、客户姓名、面诊医生、核心诉求、最终接纳度、面诊时长秒数 """ systemPrompt += contentChecklist from run_agent import AIAgent from hermes_state import SessionDB db = SessionDB() loop = asyncio.get_event_loop() try: agent = AIAgent( model=os.getenv("DEEPVIEW_MODEL", "gemini-pro-vertex"), enabled_toolsets=["file"], quiet_mode=True, platform="deepview", session_id=str(uuid.uuid4()), session_db=db, user_id=userId, ) result = await loop.run_in_executor( None, lambda: agent.run_conversation( user_message="请根据给定的上下文文件,生成完整的面诊沟通X光片分析报告。按照章节 checklist 确保不遗漏任何模块。", system_message=systemPrompt, ), ) mdReport = result.get("final_response", "").strip() logger.info(f"[DeepviewSSE] Stage 1 (Hermes md2md) done, length={len(mdReport)} chars") if not mdReport or len(mdReport) < 100: return web.json_response({"error": "Stage 1 报告内容为空"}, status=500, headers=_CORS_HEADERS) except Exception as e: logger.error(f"[DeepviewSSE] Stage 1 error: {e}") return web.json_response({"error": f"Stage 1 failed: {e}"}, status=500, headers=_CORS_HEADERS) # ── Stage 2 格式引擎: qwen3-plus via LiteLLM (JSON Schema 硬约束) ── # 纯转换任务,不做内容创造,只做结构对齐 jsonSchema = { "type": "object", "properties": { "reportCode": {"type": "string"}, "patientName": {"type": "string"}, "coreDemand": {"type": "string"}, "acceptance": {"type": "string"}, "duration": {"type": "number"}, "xray": { "type": "object", "properties": { "module1": { "type": "object", "properties": { "result": {"type": "string"}, "trustIndex": {"type": "number"}, "coreInsight": {"type": "string"} }, "required": ["result", "trustIndex", "coreInsight"] }, "module2": { "type": "object", "properties": { "ratio": {"type": "string"}, "metaphorScore": {"type": "string"}, "curveDesc": {"type": "array", "items": {"type": "string"}} }, "required": ["ratio", "metaphorScore", "curveDesc"] }, "module3": { "type": "object", "properties": { "breakpoints": { "type": "array", "items": { "type": "object", "properties": { "timestamp": {"type": "string"}, "contextText": {"type": "array", "items": {"type": "string"}}, "diagnosis": {"type": "string"} }, "required": ["timestamp", "contextText", "diagnosis"] } }, "benchmark": {"type": "string"} }, "required": ["breakpoints", "benchmark"] }, "module4": { "type": "object", "properties": { "radar": { "type": "object", "properties": { "d": {"type": "number"}, "d_comment": {"type": "string"}, "e1": {"type": "number"}, "e1_comment": {"type": "string"}, "e2": {"type": "number"}, "e2_comment": {"type": "string"}, "p": {"type": "number"}, "p_comment": {"type": "string"} }, "required": ["d", "d_comment", "e1", "e1_comment", "e2", "e2_comment", "p", "p_comment"] }, "expertAdvice": {"type": "string"} }, "required": ["radar", "expertAdvice"] }, "module5": { "type": "object", "properties": { "track1": { "type": "array", "items": { "type": "object", "properties": { "node": {"type": "string"}, "action": {"type": "string"}, "strategy": {"type": "string"}, "purpose": {"type": "string"} }, "required": ["node", "action", "strategy", "purpose"] } }, "track2": { "type": "array", "items": { "type": "object", "properties": { "change": {"type": "string"}, "action": {"type": "string"}, "strategy": {"type": "string"} }, "required": ["change", "action", "strategy"] } } }, "required": ["track1", "track2"] } }, "required": ["module1", "module2", "module3", "module4", "module5"] } }, "required": ["reportCode", "patientName", "coreDemand", "acceptance", "duration", "xray"] } try: from openai import OpenAI litellmClient = OpenAI( base_url=os.getenv("GEMINI_BASE_URL", "http://127.0.0.1:4000/v1"), api_key=os.getenv("GEMINI_API_KEY", "sk-placeholder"), ) stage2Response = await loop.run_in_executor( None, lambda: litellmClient.chat.completions.create( model=os.getenv("DEEPVIEW_STAGE2_MODEL", "qwen-plus"), messages=[ {"role": "system", "content": """你是一个 JSON 格式转换器。将用户提供的面诊分析报告精确转换为以下 JSON 结构。 忠实保留原文内容,不要增删改。必须严格使用以下字段名(不要用中文或其他名称)。 【类型约束·极其重要】 - duration: 纯整数(秒数),如 1800,不要带单位 - trustIndex: 纯整数 0-100,如 65,不要带% - radar 里的 d/e1/e2/p: 纯浮点数 1.0-5.0,如 4.5,绝不能写成"4.5/5 分"或"4.5分" 顶层: reportCode, patientName, doctorName, coreDemand, acceptance, duration, xray xray.module1: result, trustIndex, coreInsight xray.module2: ratio, metaphorScore, curveDesc(字符串数组) xray.module3: breakpoints(数组,每项含timestamp/contextText数组/diagnosis), benchmark xray.module4: radar(含d/d_comment/e1/e1_comment/e2/e2_comment/p/p_comment), expertAdvice xray.module5: track1(数组,每项含node/action/strategy/purpose), track2(数组,每项含change/action/strategy)"""}, {"role": "user", "content": mdReport} ], response_format={"type": "json_object"}, max_tokens=8192, user=userId, extra_body={ "metadata": { "deepview_task": contextId, "deepview_stage": "stage2_json", "deepview_user": userId, } }, ) ) jsonOutput = stage2Response.choices[0].message.content.strip() parsedData = json.loads(jsonOutput) logger.info(f"[DeepviewSSE] Stage 2 (json_object) done, keys={list(parsedData.keys())}") xray = parsedData.get('xray', {}) for mk in ['module1', 'module2', 'module3', 'module4', 'module5']: logger.info(f"[DeepviewSSE] xray.{mk}: {'✅' if xray.get(mk) else '❌ MISSING'}") # ── 持久化:写入 DB(hermes state.db)── import uuid reportId = presetReportId if presetReportId else "rep_" + uuid.uuid4().hex[:8] try: from hermes_state import SessionDB reportDb = SessionDB() reportJson = json.dumps(parsedData, ensure_ascii=False) with reportDb._lock: if presetReportId: reportDb._conn.execute( "UPDATE deepview_reports_v2 SET status='completed', report_json=?, created_at=? WHERE report_id=?", (reportJson, time.time(), presetReportId) ) else: reportDb._conn.execute( "INSERT OR REPLACE INTO deepview_reports_v2 (report_id, context_id, user_id, report_json, created_at, status) VALUES (?, ?, ?, ?, ?, 'completed')", (reportId, contextId, userId, reportJson, time.time()) ) reportDb._conn.commit() logger.info(f"[DeepviewSSE] Report persisted to DB: {contextId} -> {reportId}") except Exception as dbErr: logger.error(f"[DeepviewSSE] DB persist failed: {dbErr}") safeBody = json.dumps({"success": True, "reportId": reportId, "data": parsedData}, ensure_ascii=False) return web.Response(text=safeBody, content_type="application/json", headers=_CORS_HEADERS) except Exception as e: logger.error(f"[DeepviewSSE] Stage 2 error: {e}") return web.json_response({"error": f"Stage 2 (json conversion) failed: {e}"}, status=500, headers=_CORS_HEADERS) async def _handleReportGet(self, request: "web.Request") -> "web.Response": """ GET /deepview/report/get?reportId=rep_xxx 从 DB 读取已持久化的报告 JSON。 """ if request.method == "OPTIONS": return web.Response(status=200, headers=_CORS_HEADERS) user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) reportId = request.query.get("reportId", "") if not reportId: return web.json_response({"error": "Missing reportId"}, status=400, headers=_CORS_HEADERS) try: from hermes_state import SessionDB db = SessionDB() with db._lock: cursor = db._conn.execute( "SELECT report_json, created_at, client_id FROM deepview_reports_v2 WHERE report_id = ?", (reportId,) ) row = cursor.fetchone() if not row: return web.json_response({"error": "Report not found", "reportId": reportId}, status=404, headers=_CORS_HEADERS) reportData = json.loads(row[0]) clientId = row[2] # Attach clientId into the data payload so the frontend knows if it's archived if clientId: reportData["clientId"] = clientId safeBody = json.dumps({"success": True, "data": reportData}, ensure_ascii=False) return web.Response(text=safeBody, content_type="application/json", headers=_CORS_HEADERS) except Exception as e: logger.error(f"[DeepviewSSE] Report get error: {e}") return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) async def _handleReportsList(self, request: "web.Request") -> "web.Response": """ GET /deepview/reports/list 返回当前用户所有的面诊报告,按时间倒序排列。包含生成中的占位记录。 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: from hermes_state import SessionDB db = SessionDB() with db._lock: cursor = db._conn.execute( "SELECT report_id, context_id, report_json, created_at, status, client_id FROM deepview_reports_v2 WHERE user_id = ? ORDER BY created_at DESC", (user["userId"],) ) rows = cursor.fetchall() reports = [] for row in rows: rep_id, ctx_id, r_json, c_time, status, client_id = row try: data = json.loads(r_json) except: data = {} reports.append({ "id": rep_id, "contextId": ctx_id, "status": status, "createdAt": c_time, "clientId": client_id, "data": data }) return web.json_response({"success": True, "reports": reports}, headers=_CORS_HEADERS) except Exception as e: logger.error(f"[DeepviewSSE] Reports list error: {e}") return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) async def _handleMaterialsUploadToken(self, request: "web.Request") -> "web.Response": """ POST /deepview/materials/upload_token Get a pre-signed URL for direct OSS upload. """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except: body = {} filename = body.get("filename", "unknown.pdf") import os ext = os.path.splitext(filename)[1].lower() import uuid file_uid = uuid.uuid4().hex object_key = f"deepview-raw/{user['userId']}/{file_uid}{ext}" oss_ak = os.getenv("ALIYUN_ACCESS_KEY_ID", "") oss_sk = os.getenv("ALIYUN_ACCESS_KEY_SECRET", "") oss_bucket = os.getenv("OSS_BUCKET", "meetings-dev") oss_endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com") if not all([oss_ak, oss_sk]): # 降级:如果服务器没配 OSS 参数,或者本地测试,可能直接失败 return web.json_response({"error": "Server missing OSS credentials (ALIYUN_ACCESS_KEY_ID)"}, status=500, headers=_CORS_HEADERS) import oss2 auth = oss2.Auth(oss_ak, oss_sk) bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket) # 必须显式指定 Content-Type 来生成签名,否则前端带默认 MIME type 会导致 OSS 拒签 403 headers = {'Content-Type': 'application/octet-stream'} put_url = bucket.sign_url("PUT", object_key, 3600, headers=headers) if put_url.startswith("http://"): put_url = put_url.replace("http://", "https://", 1) return web.json_response({ "putUrl": put_url, "ossKey": object_key, "filename": filename }, headers=_CORS_HEADERS) async def _handleMaterialsConfirm(self, request: "web.Request") -> "web.Response": """ POST /deepview/materials/confirm Frontend calls this after OSS PUT succeeds. """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except: return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS) oss_key = body.get("ossKey") context_id = body.get("contextId", "deepview") filename = body.get("filename", "unknown.pdf") if not oss_key: return web.json_response({"error": "Missing ossKey"}, status=400, headers=_CORS_HEADERS) # Offload to background task try: from .deepview_materials import process_uploaded_material_oss except ImportError: from gateway.platforms.deepview_materials import process_uploaded_material_oss import uuid import time report_id = "rep_" + uuid.uuid4().hex[:8] # INSERT placeholder INTO deepview_reports_v2 if context_id.startswith("recording:"): try: from hermes_state import SessionDB db = SessionDB() with db._lock: db._conn.execute( "INSERT INTO deepview_reports_v2 (report_id, context_id, user_id, report_json, created_at, status) VALUES (?, ?, ?, '{}', ?, 'processing')", (report_id, context_id, user["userId"], time.time()) ) db._conn.commit() except Exception as e: logger.error(f"Failed to create processing report placeholder: {e}") async def _wrapper(): # 1. ASR stage await process_uploaded_material_oss( oss_key=oss_key, original_filename=filename, context_id=context_id, push_event_fn=self._pushEvent, user_id=user["userId"] ) # 2. X-ray stage (Auto orchestrate) if context_id.startswith("recording:"): import sys, os try: port = int(os.environ.get('PORT', 8645)) import aiohttp async with aiohttp.ClientSession() as session: url = f"http://127.0.0.1:{port}/deepview/report/generate" headers = {"Authorization": request.headers.get("Authorization", "")} logger.info(f"[DeepviewSSE] Auto-triggering report generation internally for {context_id} -> {report_id}") async with session.post(url, json={"contextId": context_id, "doctorId": "doc_001", "presetReportId": report_id}, headers=headers) as resp: res_json = await resp.json() if res_json.get("success"): self._pushEvent(user["userId"], "report:ready", {"reportId": report_id}) else: logger.error(f"[DeepviewSSE] Internal generate failed: {res_json}") # Mark as failed in DB from hermes_state import SessionDB db = SessionDB() with db._lock: db._conn.execute("UPDATE deepview_reports_v2 SET status='failed' WHERE report_id=?", (report_id,)) db._conn.commit() except Exception as e: logger.error(f"[DeepviewSSE] Wrapper pipeline failed: {e}") asyncio.create_task(_wrapper()) return web.json_response({"received": True, "reportId": report_id if context_id.startswith("recording:") else None}, headers=_CORS_HEADERS) # ────────────────────────────────────────────── # 会话列表 API # ────────────────────────────────────────────── async def _handleSessionsList(self, request: "web.Request") -> "web.Response": """ GET /deepview/sessions/list 从 SessionDB 查询当前用户的所有 deepview 会话。 返回 {sessions: [{id, title, startedAt, preview, messageCount}]} ★ 自动迁移:首次真实登录时,将 dev_user_001 和 NULL 的旧记录归属到当前用户 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) userId = user["userId"] try: from hermes_state import SessionDB db = SessionDB() # ★ 自动迁移:如果当前是真实 JWT 用户(非 dev), # 将所有 deepview source 且 user_id 为 NULL 或 dev_user_001 的 session 归属给当前用户 if userId != "dev_user_001": try: conn = db._conn with db._lock: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "UPDATE sessions SET user_id = ? " "WHERE source = 'deepview' AND (user_id IS NULL OR user_id = 'dev_user_001')", (userId,) ) migrated = cursor.rowcount conn.commit() if migrated > 0: logger.info("[DeepviewSSE] Auto-migrated %d orphan sessions to userId=%s", migrated, userId[:8]) except Exception as e: logger.warning("[DeepviewSSE] Session migration failed: %s", e) try: conn.rollback() except Exception: pass # 查询 source="deepview" 的所有会话 rawSessions = db.list_sessions_rich(source="deepview", limit=100) # 按 user_id 过滤(只返回当前用户的会话) sessions = [] for s in rawSessions: # 开发模式或 user_id 匹配 if s.get("user_id") == userId or os.environ.get("DEEPVIEW_AUTH_DEV_MODE", ""): sessions.append({ "id": s["id"], "title": s.get("title") or s.get("preview") or "新对话", "startedAt": s.get("started_at"), "messageCount": s.get("message_count", 0), "preview": s.get("preview", ""), }) return web.json_response({"sessions": sessions}, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to list sessions for userId=%s: %s", userId, e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) # ────────────────────────────────────────────── # 报告持久化 # ────────────────────────────────────────────── def _initReportsDb(self) -> None: """确保 deepview_reports 表存在(复用 Hermes state.db)。""" try: from hermes_state import SessionDB db = SessionDB() with db._lock: db._conn.execute(""" CREATE TABLE IF NOT EXISTS deepview_reports_v2 ( report_id TEXT PRIMARY KEY, context_id TEXT NOT NULL, user_id TEXT NOT NULL, report_json TEXT NOT NULL, created_at REAL NOT NULL, status TEXT DEFAULT 'completed', client_id TEXT DEFAULT NULL ) """) cursor = db._conn.execute("PRAGMA table_info(deepview_reports_v2)") columns = [row[1] for row in cursor.fetchall()] if 'status' not in columns: db._conn.execute("ALTER TABLE deepview_reports_v2 ADD COLUMN status TEXT DEFAULT 'completed'") if 'client_id' not in columns: db._conn.execute("ALTER TABLE deepview_reports_v2 ADD COLUMN client_id TEXT DEFAULT NULL") db._conn.execute(""" CREATE INDEX IF NOT EXISTS idx_deepview_reports_v2_user ON deepview_reports_v2(user_id) """) db._conn.commit() logger.info("[DeepviewSSE] Reports table ready") except Exception as e: logger.warning("[DeepviewSSE] Failed to init reports table: %s", e) # ────────────────────────────────────────────── # 钉选板持久化 # ────────────────────────────────────────────── def _initPinsDb(self) -> None: """确保 deepview_pinned_items 表存在(复用 Hermes state.db)。""" try: from hermes_state import SessionDB db = SessionDB() with db._lock: db._conn.execute(""" CREATE TABLE IF NOT EXISTS deepview_pinned_items ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, title TEXT, snippet TEXT, context_id TEXT DEFAULT 'deepview', chat_id TEXT, created_at REAL NOT NULL ) """) db._conn.execute(""" CREATE INDEX IF NOT EXISTS idx_deepview_pins_user ON deepview_pinned_items(user_id, context_id) """) db._conn.commit() logger.info("[DeepviewSSE] Pinned items table ready") except Exception as e: logger.warning("[DeepviewSSE] Failed to init pins table: %s", e) async def _handlePinsList(self, request: "web.Request") -> "web.Response": """GET /deepview/pins/list — 获取当前用户的所有钉选项。""" user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) userId = user["userId"] try: from hermes_state import SessionDB db = SessionDB() with db._lock: cursor = db._conn.execute( "SELECT id, title, snippet, context_id, chat_id, created_at " "FROM deepview_pinned_items WHERE user_id = ? ORDER BY created_at DESC", (userId,) ) rows = cursor.fetchall() pins = [] for r in rows: row = dict(r) # 格式化日期为 MM-DD import datetime dt = datetime.datetime.fromtimestamp(row["created_at"]) dateStr = f"{dt.month:02d}-{dt.day:02d}" pins.append({ "id": row["id"], "title": row["title"] or "提取结论", "snippet": row["snippet"] or "", "date": dateStr, "contextId": row["context_id"], "chatId": row["chat_id"] or "", }) return web.json_response({"pins": pins}, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to list pins: %s", e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) async def _handlePinsAdd(self, request: "web.Request") -> "web.Response": """ POST /deepview/pins/add Body: { id, title, snippet, contextId, chatId } """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS) userId = user["userId"] pinId = body.get("id", f"pin_{int(time.time())}") title = body.get("title", "提取结论") snippet = body.get("snippet", "") contextId = body.get("contextId", "deepview") chatId = body.get("chatId", "") try: from hermes_state import SessionDB db = SessionDB() # 去重:同 user + context + title + snippet 不重复插入 with db._lock: existing = db._conn.execute( "SELECT id FROM deepview_pinned_items " "WHERE user_id = ? AND context_id = ? AND title = ? AND snippet = ?", (userId, contextId, title, snippet) ).fetchone() if existing: return web.json_response({"duplicate": True, "id": existing["id"]}, headers=_CORS_HEADERS) def _do(conn): conn.execute( "INSERT INTO deepview_pinned_items (id, user_id, title, snippet, context_id, chat_id, created_at) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (pinId, userId, title, snippet, contextId, chatId, time.time()) ) db._execute_write(_do) return web.json_response({"success": True, "id": pinId}, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to add pin: %s", e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) async def _handlePinsRemove(self, request: "web.Request") -> "web.Response": """ POST /deepview/pins/remove Body: { id } """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except Exception: return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS) userId = user["userId"] pinId = body.get("id", "") if not pinId: return web.json_response({"error": "Missing id"}, status=400, headers=_CORS_HEADERS) try: from hermes_state import SessionDB db = SessionDB() def _do(conn): conn.execute( "DELETE FROM deepview_pinned_items WHERE id = ? AND user_id = ?", (pinId, userId) ) db._execute_write(_do) return web.json_response({"success": True}, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to remove pin: %s", e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) # ────────────────────────────────────────────── # 素材文件持久化 # ────────────────────────────────────────────── # ★ 驨总基础知识库文件(预填充到 DB,统一管理) _DEEPVIEW_BASE_FILES = [ ("xz_0", "00_前言.md", "deepview"), ("xz_1", "01_第一章 品牌机会论证.md", "deepview"), ("xz_2", "02_第二章 原点市场与产品概念.md", "deepview"), ("xz_3", "03_第三章 品牌九要素之语言表达.md", "deepview"), ("xz_4", "04_第四章 品牌九要素之视觉表达.md", "deepview"), ("xz_5", "05_第五章 产品设计、广告与公关建品牌.md", "deepview"), ("xz_6", "06_第六章 新产品上市之品牌试错期.md", "deepview"), ("xz_7", "07_第七章 品牌的增长.md", "deepview"), ("xz_8", "08_第八章 连锁合作谈判.md", "deepview"), ("xz_9", "09_第九章 打赢商战.md", "deepview"), ] def _initMaterialsDb(self) -> None: """确保 deepview_materials 表存在并预填基础文件。""" try: from hermes_state import SessionDB db = SessionDB() with db._lock: db._conn.execute(""" CREATE TABLE IF NOT EXISTS deepview_materials ( id TEXT PRIMARY KEY, filename TEXT NOT NULL, context_id TEXT DEFAULT 'deepview', user_id TEXT, source TEXT DEFAULT 'base', created_at REAL NOT NULL ) """) db._conn.execute(""" CREATE INDEX IF NOT EXISTS idx_deepview_materials_ctx ON deepview_materials(context_id) """) # 预填基础知识库文件 for fid, fname, ctx in self._DEEPVIEW_BASE_FILES: db._conn.execute( "INSERT OR IGNORE INTO deepview_materials (id, filename, context_id, source, created_at) " "VALUES (?, ?, ?, 'base', ?)", (fid, fname, ctx, 0) # created_at=0 表示基础文件 ) db._conn.commit() logger.info("[DeepviewSSE] Materials table ready") except Exception as e: logger.warning("[DeepviewSSE] Failed to init materials table: %s", e) async def _handleMaterialsList(self, request: "web.Request") -> "web.Response": """ GET /deepview/materials/list?contextId=deepview 返回指定 context 下的所有素材文件(基础 + 用户上传)。 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) contextId = request.query.get("contextId", "deepview") try: from hermes_state import SessionDB db = SessionDB() with db._lock: cursor = db._conn.execute( "SELECT id, filename, context_id, source " "FROM deepview_materials WHERE context_id = ? ORDER BY created_at ASC", (contextId,) ) rows = cursor.fetchall() files = [] for r in rows: row = dict(r) files.append({ "id": row["id"], "name": row["filename"], "projectId": row["context_id"], "source": row["source"], }) return web.json_response({"files": files}, headers=_CORS_HEADERS) except Exception as e: logger.error("Failed to list materials: %s", e) return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) # ────────────────────────────────────────────── # 客户信息 API (Phase 4) # ────────────────────────────────────────────── async def _handleClientsList(self, request: "web.Request") -> "web.Response": """ GET /deepview/clients/list 返回所有客户记录列表。扫描 storage/clients 目录。 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) userDir = self._getUserStorageDir(user.get("sub", "unknown")) clientsDir = os.path.join(userDir, "clients") clients = [] if os.path.exists(clientsDir): for d in os.listdir(clientsDir): clientPath = os.path.join(clientsDir, d) if os.path.isdir(clientPath): # 尝试读取 profile.md 以获取首行做名字(可选优化) # 默认使用目录名作为 ID 和名字 clients.append({ "id": d, "name": d, "hasProfile": os.path.exists(os.path.join(clientPath, "profile.md")) }) return web.json_response({"clients": clients}, headers=_CORS_HEADERS) async def _handleClientCreate(self, request: "web.Request") -> "web.Response": """ POST /deepview/clients/create 动态建档 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except: return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS) name = body.get("name", "").strip() phone = body.get("phone", "").strip() if not name: return web.json_response({"error": "Name is required"}, status=400, headers=_CORS_HEADERS) import uuid import time clientId = f"p_{str(uuid.uuid4())[:8]}" userDir = self._getUserStorageDir(user.get("sub", "unknown")) clientDir = os.path.join(userDir, "clients", clientId) os.makedirs(clientDir, exist_ok=True) profileData = f"# 客户基本档案\n姓名:{name}\n手机号/尾号:{phone}\n建档时间:{time.strftime('%Y-%m-%d')}\n" with open(os.path.join(clientDir, "profile.md"), "w", encoding="utf-8") as f: f.write(profileData) return web.json_response({ "id": clientId, "name": name, "phone": phone }, headers=_CORS_HEADERS) async def _handleClientProfile(self, request: "web.Request") -> "web.Response": """ GET /deepview/clients/{id}/profile 读取并返回某个客户的 profile.md。 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) clientId = request.match_info.get("id", "") if not clientId: return web.json_response({"error": "Missing client ID"}, status=400, headers=_CORS_HEADERS) userDir = self._getUserStorageDir(user.get("sub", "unknown")) profilePath = os.path.join(userDir, "clients", clientId, "profile.md") content = "" if os.path.exists(profilePath): with open(profilePath, "r", encoding="utf-8") as f: content = f.read() return web.json_response({ "id": clientId, "profileContent": content }, headers=_CORS_HEADERS) # ────────────────────────────────────────────── # 报告归档 (Inbox → Client) # ────────────────────────────────────────────── async def _handleReportArchive(self, request: "web.Request") -> "web.Response": """ POST /deepview/reports/archive Body: { reportId, clientId } 将 inbox 中的报告物理迁移到 clients/{clientId}/history/ 并更新 DB。 """ user = await self._extractUser(request) if not user: return web.json_response({"error": "Unauthorized"}, status=401, headers=_CORS_HEADERS) try: body = await request.json() except: return web.json_response({"error": "Invalid JSON"}, status=400, headers=_CORS_HEADERS) reportId = body.get("reportId", "").strip() clientId = body.get("clientId", "").strip() if not reportId or not clientId: return web.json_response({"error": "Missing reportId or clientId"}, status=400, headers=_CORS_HEADERS) userId = user["userId"] userDir = self._getUserStorageDir(user.get("sub", "unknown")) # 1. 物理文件迁移:users/{userId}/inbox/{reportId}/ → users/{userId}/clients/{clientId}/history/{reportId}/ import shutil inboxDir = os.path.join(userDir, "inbox", reportId) targetDir = os.path.join(userDir, "clients", clientId, "history", reportId) if os.path.exists(inboxDir): os.makedirs(os.path.dirname(targetDir), exist_ok=True) shutil.move(inboxDir, targetDir) logger.info(f"[DeepviewSSE] Archived {inboxDir} → {targetDir}") # 2. DB 更新:标记 client_id + 更新 context_id try: from hermes_state import SessionDB db = SessionDB() with db._lock: db._conn.execute( "UPDATE deepview_reports_v2 SET client_id=?, context_id=? WHERE report_id=? AND user_id=?", (clientId, f"recording:{clientId}/{reportId}", reportId, userId) ) db._conn.commit() logger.info(f"[DeepviewSSE] DB archived: {reportId} → client {clientId}") except Exception as e: logger.error(f"[DeepviewSSE] Archive DB update failed: {e}") return web.json_response({"error": str(e)}, status=500, headers=_CORS_HEADERS) return web.json_response({"success": True, "reportId": reportId, "clientId": clientId}, headers=_CORS_HEADERS) # ────────────────────────────────────────────── # 服务器生命周期 # ────────────────────────────────────────────── async def start(self) -> None: """启动 aiohttp 服务器。""" if not AIOHTTP_AVAILABLE: raise RuntimeError("aiohttp is required for DeepviewSSEServer") self._app = web.Application() # 注册路由 self._app.router.add_get("/deepview/health", self._handleHealth) self._app.router.add_get("/deepview/chat/history", self._handleHistory) self._app.router.add_get("/deepview/sessions/list", self._handleSessionsList) self._app.router.add_get("/deepview/events", self._handleSSEConnect) self._app.router.add_post("/deepview/chat", self._handleChat) self._app.router.add_post("/deepview/report/generate", self._handleReportGenerate) self._app.router.add_get("/deepview/report/get", self._handleReportGet) self._app.router.add_get("/deepview/reports/list", self._handleReportsList) self._app.router.add_get("/deepview/profile", self._handleProfileGet) self._app.router.add_post("/deepview/profile", self._handleProfilePost) self._app.router.add_post("/deepview/materials/upload_token", self._handleMaterialsUploadToken) self._app.router.add_post("/deepview/materials/confirm", self._handleMaterialsConfirm) self._app.router.add_get("/deepview/materials/list", self._handleMaterialsList) self._app.router.add_get("/deepview/pins/list", self._handlePinsList) self._app.router.add_post("/deepview/pins/add", self._handlePinsAdd) self._app.router.add_post("/deepview/pins/remove", self._handlePinsRemove) # 客户信息 APIs self._app.router.add_get("/deepview/clients/list", self._handleClientsList) self._app.router.add_post("/deepview/clients/create", self._handleClientCreate) self._app.router.add_get("/deepview/clients/{id}/profile", self._handleClientProfile) # 报告归档 API (Inbox → Client) self._app.router.add_post("/deepview/reports/archive", self._handleReportArchive) # OPTIONS 预检(CORS) self._app.router.add_route("OPTIONS", "/deepview/{path:.*}", self._handleOptions) self._runner = web.AppRunner(self._app) await self._runner.setup() site = web.TCPSite(self._runner, self._host, self._port) await site.start() devMode = os.environ.get("DEEPVIEW_AUTH_DEV_MODE", "") authMode = "dev mode (no auth)" if devMode else f"MindPass delegation ({MINDPASS_BASE_URL})" logger.info("[DeepviewSSE] Server started on http://%s:%d (auth: %s)", self._host, self._port, authMode) async def stop(self) -> None: """停止服务器。""" # 断开所有 SSE 客户端 for userId, conns in list(self._sseClients.items()): for connId, q in conns.items(): q.put_nowait(None) self._sseClients.clear() if self._runner: await self._runner.cleanup() logger.info("[DeepviewSSE] Server stopped") async def _handleOptions(self, request: "web.Request") -> "web.Response": """CORS 预检。""" return web.Response(status=200, headers=_CORS_HEADERS) # ── 独立启动入口 ── async def main(): """直接运行此文件来启动深维 SSE 服务器。""" import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") server = DeepviewSSEServer( host=os.getenv("DEEPVIEW_SSE_HOST", "127.0.0.1"), port=int(os.getenv("DEEPVIEW_SSE_PORT", "8643")), ) await server.start() # 保持运行 try: while True: await asyncio.sleep(3600) except KeyboardInterrupt: pass finally: await server.stop() if __name__ == "__main__": asyncio.run(main())