From ec8eaa0b365b79a7a80c4a349ad6b20e76f3e64e Mon Sep 17 00:00:00 2001 From: lidf Date: Tue, 7 Apr 2026 18:13:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=8B=AC=E7=AB=8B=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=EF=BC=88=E5=85=B1=E4=BA=AB=20VOC=20=E6=95=B0=E6=8D=AE=E5=B1=82?= =?UTF-8?q?=20+=20=E8=87=AA=E6=9C=89=E5=88=86=E6=9E=90=E5=AD=98=E5=82=A8?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - backend/server.py: FastAPI 端口 8093 - backend/db.py: 双库设计(案例 DB 读写 + VOC DB 只读) - backend/tools/ude_extract.py: UDE 转写 + 向量聚类 - backend/prompts/voc_to_ude.txt: TOC 7条规范约束 - 已部署至 /opt/apps/mafia-proposal/ (systemd) - Nginx /copaw/mafia/api/ 代理已配置 --- .gitignore | 4 + backend/.env.example | 13 ++ backend/db.py | 167 +++++++++++++++++ backend/prompts/voc_to_ude.txt | 52 ++++++ backend/requirements.txt | 7 + backend/server.py | 217 ++++++++++++++++++++++ backend/tools/__init__.py | 8 + backend/tools/ude_extract.py | 325 +++++++++++++++++++++++++++++++++ 8 files changed, 793 insertions(+) create mode 100644 backend/.env.example create mode 100644 backend/db.py create mode 100644 backend/prompts/voc_to_ude.txt create mode 100644 backend/requirements.txt create mode 100644 backend/server.py create mode 100644 backend/tools/__init__.py create mode 100644 backend/tools/ude_extract.py diff --git a/.gitignore b/.gitignore index e3119f3..9b58fe2 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,10 @@ agent/memory/ agent/cases/ agent/iteration_reports/ +# 后端数据与密钥 +backend/data/ +backend/.env + # macOS .DS_Store diff --git a/backend/.env.example b/backend/.env.example new file mode 100644 index 0000000..e972ce0 --- /dev/null +++ b/backend/.env.example @@ -0,0 +1,13 @@ +# LLM(通过 LiteLLM 网关) +LITELLM_PROXY_URL=http://127.0.0.1:4000/v1 +LITELLM_MASTER_KEY= +MODEL_ID=qwen-plus + +# 向量化(DashScope text-embedding-v4) +DASHSCOPE_API_KEY= + +# 共享 VOC 数据层 +VOC_DATA_DIR=/opt/apps/voc-researcher/data + +# 服务 +PORT=8093 diff --git a/backend/db.py b/backend/db.py new file mode 100644 index 0000000..ce4a8ee --- /dev/null +++ b/backend/db.py @@ -0,0 +1,167 @@ +""" +黑手党提案 — 数据库管理 + +双库设计: + 1. 案例 DB(读写):每个提案案例一个 SQLite,存分析结果 + 2. VOC DB(只读):读取共享 VOC 数据层的原始评论 +""" +import os +import sqlite3 +import uuid +from pathlib import Path + +from dotenv import load_dotenv + +load_dotenv() + +DATA_DIR = Path(__file__).parent / "data" +DATA_DIR.mkdir(exist_ok=True) + +VOC_DATA_DIR = Path(os.getenv("VOC_DATA_DIR", "")) + + +# ═══════════ 案例 DB(读写) ═══════════ + +CASE_SCHEMA = """ +CREATE TABLE IF NOT EXISTS case_card ( + brand_name TEXT NOT NULL, + category TEXT, + focus_product TEXT, + competitors TEXT, + voc_research_id TEXT, + created_at TEXT DEFAULT (datetime('now')), + status TEXT DEFAULT 'draft' +); + +CREATE TABLE IF NOT EXISTS ude_sentences ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + voc_comment_id INTEGER, + ude_text TEXT NOT NULL, + confidence REAL DEFAULT 0.5, + vector TEXT, + cluster_id INTEGER DEFAULT -1, + created_at TEXT DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS ude_clusters ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + representative_ude TEXT, + coverage INTEGER, + sample_voices TEXT, + user_label TEXT, + confirmed INTEGER DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS conflicts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ude_cluster_id INTEGER, + goal TEXT, + need TEXT, + prerequisite TEXT, + convention TEXT, + conflict_type TEXT, + description TEXT +); + +CREATE TABLE IF NOT EXISTS proposal_sections ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + section TEXT, + content TEXT, + version INTEGER DEFAULT 1, + updated_at TEXT DEFAULT (datetime('now')) +); +""" + + +def get_case_conn(case_id: str) -> sqlite3.Connection: + """获取案例 DB 连接(读写)""" + path = DATA_DIR / f"{case_id}.db" + if not path.exists(): + raise FileNotFoundError(f"案例 {case_id} 不存在") + conn = sqlite3.connect(str(path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + return conn + + +def init_case_db(brand_name: str, category: str = "", focus_product: str = "", + competitors: str = "[]", voc_research_id: str = None) -> str: + """创建新案例,返回 case_id""" + case_id = uuid.uuid4().hex[:8] + path = DATA_DIR / f"{case_id}.db" + conn = sqlite3.connect(str(path)) + conn.row_factory = sqlite3.Row + conn.executescript(CASE_SCHEMA) + conn.execute( + "INSERT INTO case_card (brand_name, category, focus_product, competitors, voc_research_id) VALUES (?,?,?,?,?)", + (brand_name, category, focus_product, competitors, voc_research_id) + ) + conn.commit() + conn.close() + return case_id + + +def list_cases() -> list[dict]: + """列出所有案例""" + cases = [] + for db_file in sorted(DATA_DIR.glob("*.db")): + case_id = db_file.stem + try: + conn = sqlite3.connect(str(db_file)) + conn.row_factory = sqlite3.Row + card = conn.execute("SELECT * FROM case_card LIMIT 1").fetchone() + if card: + ude_count = conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0] + cluster_count = conn.execute("SELECT count(*) FROM ude_clusters").fetchone()[0] + cases.append({ + "case_id": case_id, + **dict(card), + "ude_count": ude_count, + "cluster_count": cluster_count, + }) + conn.close() + except Exception: + pass + return cases + + +# ═══════════ VOC DB(只读) ═══════════ + +def get_voc_conn(voc_research_id: str) -> sqlite3.Connection: + """只读访问共享 VOC 数据""" + if not VOC_DATA_DIR.exists(): + raise FileNotFoundError(f"VOC 数据目录不存在: {VOC_DATA_DIR}") + path = VOC_DATA_DIR / f"{voc_research_id}.db" + if not path.exists(): + raise FileNotFoundError(f"VOC 研究 {voc_research_id} 不存在") + conn = sqlite3.connect(f"file:{path}?mode=ro", uri=True) + conn.row_factory = sqlite3.Row + return conn + + +def list_voc_researches() -> list[dict]: + """列出共享 VOC 数据层中的所有研究""" + if not VOC_DATA_DIR.exists(): + return [] + researches = [] + for db_file in sorted(VOC_DATA_DIR.glob("*.db")): + if db_file.name in ("global_cache.db", "agent_sessions.db"): + continue + rid = db_file.stem + try: + conn = sqlite3.connect(f"file:{db_file}?mode=ro", uri=True) + conn.row_factory = sqlite3.Row + card = conn.execute("SELECT brand_name FROM research_card LIMIT 1").fetchone() + comment_count = conn.execute( + "SELECT count(*) FROM comments WHERE length(text) > 10" + ).fetchone()[0] + conn.close() + if card and comment_count > 0: + researches.append({ + "research_id": rid, + "brand_name": card["brand_name"], + "comment_count": comment_count, + }) + except Exception: + pass + return researches diff --git a/backend/prompts/voc_to_ude.txt b/backend/prompts/voc_to_ude.txt new file mode 100644 index 0000000..92a93ac --- /dev/null +++ b/backend/prompts/voc_to_ude.txt @@ -0,0 +1,52 @@ +你是一个 TOC(约束理论)专家,你的任务是将消费者评论转写为 UDE(Undesirable Effect,不良效果)格式句。 + +## 什么是 UDE + +UDE = 系统中当前正在发生的、阻碍系统实现目标的、可观测的负面现象。 +UDE 是症状,不是病因,也不是解决方案。 + +## 转写规范(7 条硬约束) + +你输出的每条 UDE 必须同时满足以下全部规范,不满足则不输出: + +1. **完整陈述句**:必须是完整的句子,不能是碎片短语 +2. **现在时态**:描述当前正在发生的事 +3. **只描述效果,不含原因**:不能包含"因为…所以…"的因果分析 +4. **不是伪装的解决方案**:不能说"需要X"、"应该做Y" +5. **单一实体**:一条 UDE 只描述一个问题 +6. **客观可验证**:利益相关方能达成共识的事实 +7. **在影响范围内**:品牌/企业可以采取行动改善的 + +## 你的任务 + +对输入的每条消费者评论,判断其中是否包含不良效果。如果有,转写为 UDE 格式句;如果没有(纯分享、纯推荐、无关内容),输出 null。 + +## 输出格式 + +严格输出 JSON 数组,每个元素对应一条输入评论: + +```json +{ + "results": [ + {"id": 1, "ude": "该品类产品月均消费成本持续超出目标消费者的可接受范围", "confidence": 0.9}, + {"id": 2, "ude": null, "confidence": 0}, + {"id": 3, "ude": "消费者服用产品后持续无法感知明确效果变化", "confidence": 0.85} + ] +} +``` + +## 转写示例 + +| 消费者原文 | 正确的 UDE ✅ | 错误的写法 ❌ | +|-----------|-------------|-------------| +| "一瓶三百多,吃一个月,真的吃不起" | "该品类产品月均消费成本持续超出目标消费者的可接受范围" | "需要降价"(伪装的解决方案) | +| "吃了两个月了完全没感觉" | "消费者服用产品后持续无法感知明确效果变化" | "因为产品无效所以没感觉"(包含原因) | +| "需要冷藏但办公室没冰箱" | "产品冷藏存储要求与消费者日常携带场景持续冲突" | "应该出常温版"(伪装的解决方案) | +| "不知道该买哪个牌子好" | "消费者面对该品类众多品牌持续缺乏可信的决策依据" | "品牌多、选择困难"(碎片短语,非完整句) | +| "这个益生菌真的超好用推荐!" | null(无不良效果) | | + +## 重要提醒 + +- 你是格式转写员,不是分析师。不要添加原文中不存在的信息。 +- 转写时提升到系统/品类层面,但不能超出原文事实的边界。 +- confidence 表示你对这条转写准确性的信心(0-1),原文含义模糊时降低。 diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..a4e8c28 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,7 @@ +fastapi>=0.110.0 +uvicorn[standard]>=0.27.0 +openai>=1.12.0 +python-dotenv>=1.0.0 +numpy>=1.24.0 +scikit-learn>=1.3.0 +gunicorn>=21.2.0 diff --git a/backend/server.py b/backend/server.py new file mode 100644 index 0000000..88bc463 --- /dev/null +++ b/backend/server.py @@ -0,0 +1,217 @@ +""" +黑手党提案 — 独立后端 + +FastAPI 服务,端口 8093。 +数据来源:只读访问共享 VOC 数据层。 +分析结果:存自己的案例 DB。 +""" +import os +import logging + +from fastapi import FastAPI, Header, HTTPException, Query +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from dotenv import load_dotenv + +load_dotenv() + +from db import ( + get_case_conn, get_voc_conn, init_case_db, + list_cases as _list_cases, list_voc_researches as _list_voc_researches, +) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s") +logger = logging.getLogger("mafia") + +app = FastAPI(title="黑手党提案后端", version="1.0.0", description="独立后端:共享 VOC 数据层 + 自有分析存储") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +# ═══════════ Models ═══════════ + +class CreateCaseRequest(BaseModel): + brandName: str + category: str = "" + focusProduct: str = "" + competitors: str = "[]" + vocResearchId: str = None + + +class LinkVocRequest(BaseModel): + vocResearchId: str + + +# ═══════════ 案例管理 ═══════════ + +@app.post("/api/cases") +async def create_case(req: CreateCaseRequest): + case_id = init_case_db( + brand_name=req.brandName, + category=req.category, + focus_product=req.focusProduct, + competitors=req.competitors, + voc_research_id=req.vocResearchId, + ) + return {"caseId": case_id} + + +@app.get("/api/cases") +async def get_cases(): + return _list_cases() + + +@app.get("/api/cases/{case_id}") +async def get_case(case_id: str): + try: + with get_case_conn(case_id) as conn: + card = conn.execute("SELECT * FROM case_card LIMIT 1").fetchone() + ude_count = conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0] + cluster_count = conn.execute("SELECT count(*) FROM ude_clusters").fetchone()[0] + if not card: + raise HTTPException(404, "案例不存在") + return {"caseId": case_id, **dict(card), "udeCount": ude_count, "clusterCount": cluster_count} + except FileNotFoundError: + raise HTTPException(404, "案例不存在") + + +@app.delete("/api/cases/{case_id}") +async def delete_case(case_id: str): + from db import DATA_DIR + path = DATA_DIR / f"{case_id}.db" + if path.exists(): + path.unlink() + return {"deleted": True} + raise HTTPException(404, "案例不存在") + + +# ═══════════ VOC 关联 ═══════════ + +@app.post("/api/cases/{case_id}/link-voc") +async def link_voc(case_id: str, req: LinkVocRequest): + """关联 VOC 研究 ID(验证 VOC 研究存在后再写入)""" + try: + with get_voc_conn(req.vocResearchId) as voc: + count = voc.execute( + "SELECT count(*) FROM comments WHERE length(text) > 10 " + ).fetchone()[0] + except FileNotFoundError as e: + raise HTTPException(404, str(e)) + + try: + with get_case_conn(case_id) as conn: + conn.execute("UPDATE case_card SET voc_research_id = ?", (req.vocResearchId,)) + conn.commit() + except FileNotFoundError: + raise HTTPException(404, "案例不存在") + + return {"linked": True, "vocCommentCount": count} + + +@app.get("/api/voc/researches") +async def get_voc_researches(): + return _list_voc_researches() + + +@app.get("/api/cases/{case_id}/voc-comments") +async def get_voc_comments(case_id: str, page: int = 1, pageSize: int = 50): + """从共享 VOC 数据层只读获取原始评论""" + try: + with get_case_conn(case_id) as conn: + card = conn.execute("SELECT voc_research_id FROM case_card LIMIT 1").fetchone() + except FileNotFoundError: + raise HTTPException(404, "案例不存在") + + if not card or not card["voc_research_id"]: + raise HTTPException(400, "未关联 VOC 研究") + + try: + with get_voc_conn(card["voc_research_id"]) as voc: + total = voc.execute( + "SELECT count(*) FROM comments WHERE length(text) > 10 " + ).fetchone()[0] + rows = voc.execute(""" + SELECT id, platform, text, like_count, published_at + FROM comments WHERE length(text) > 10 + ORDER BY like_count DESC + LIMIT ? OFFSET ? + """, (pageSize, (page - 1) * pageSize)).fetchall() + except FileNotFoundError as e: + raise HTTPException(404, str(e)) + + return {"total": total, "page": page, "items": [dict(r) for r in rows]} + + +# ═══════════ UDE 分析 ═══════════ + +@app.post("/api/cases/{case_id}/ude/extract") +async def extract_ude(case_id: str, limit: int = Query(0)): + from tools.ude_extract import run_ude_extraction + try: + result = await run_ude_extraction(case_id, limit) + except FileNotFoundError as e: + raise HTTPException(404, str(e)) + return result + + +@app.post("/api/cases/{case_id}/ude/cluster") +async def cluster_ude( + case_id: str, + eps: float = Query(0.25), + minSamples: int = Query(3), + x_dashscope_key: str = Header(None), +): + from tools.ude_extract import run_clustering + key = x_dashscope_key or os.getenv("DASHSCOPE_API_KEY", "") + try: + result = run_clustering(case_id, eps, minSamples, dashscope_key=key) + except FileNotFoundError as e: + raise HTTPException(404, str(e)) + return result + + +@app.get("/api/cases/{case_id}/ude/clusters") +async def get_clusters(case_id: str): + try: + with get_case_conn(case_id) as conn: + clusters = conn.execute( + "SELECT * FROM ude_clusters ORDER BY coverage DESC" + ).fetchall() + except FileNotFoundError: + raise HTTPException(404, "案例不存在") + return [dict(r) for r in clusters] + + +@app.get("/api/cases/{case_id}/ude/coverage") +async def get_coverage(case_id: str): + from tools.ude_extract import run_coverage_scan + try: + result = run_coverage_scan(case_id) + except FileNotFoundError as e: + raise HTTPException(404, str(e)) + return result + + +# ═══════════ 健康检查 ═══════════ + +@app.get("/api/health") +async def health(): + from db import VOC_DATA_DIR, DATA_DIR + return { + "status": "ok", + "vocDataDir": str(VOC_DATA_DIR), + "vocDataExists": VOC_DATA_DIR.exists(), + "caseDataDir": str(DATA_DIR), + } + + +# ═══════════ 启动 ═══════════ + +if __name__ == "__main__": + import uvicorn + port = int(os.getenv("PORT", "8093")) + uvicorn.run(app, host="0.0.0.0", port=port) diff --git a/backend/tools/__init__.py b/backend/tools/__init__.py new file mode 100644 index 0000000..332eeba --- /dev/null +++ b/backend/tools/__init__.py @@ -0,0 +1,8 @@ +# Tools 注册表 +from tools.ude_extract import run_ude_extraction, run_clustering, run_coverage_scan + +__all__ = [ + "run_ude_extraction", + "run_clustering", + "run_coverage_scan", +] diff --git a/backend/tools/ude_extract.py b/backend/tools/ude_extract.py new file mode 100644 index 0000000..d55c639 --- /dev/null +++ b/backend/tools/ude_extract.py @@ -0,0 +1,325 @@ +""" +黑手党提案 — UDE 提取工具 + +流程:VOC 原始评论 → LLM 转写 UDE → DashScope 向量化 → DBSCAN 聚类 → 覆盖扫描 + +数据来源:只读访问共享 VOC 数据层 +分析结果:写入本项目的案例 DB +""" +from __future__ import annotations + +import json +import os +import asyncio +import logging +from pathlib import Path + +import numpy as np +from openai import OpenAI, AsyncOpenAI +from dotenv import load_dotenv + +load_dotenv() +logger = logging.getLogger(__name__) + +MODEL = os.getenv("MODEL_ID", "qwen-plus") +TEMPERATURE = float(os.getenv("TEMPERATURE", "0.1")) +BATCH_SIZE = 10 +CONCURRENCY = 5 +EMBED_DIM = 1024 +EMBED_BATCH_SIZE = 25 + +PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "voc_to_ude.txt" + + +def _get_llm_client() -> AsyncOpenAI: + return AsyncOpenAI( + api_key=os.getenv("LITELLM_MASTER_KEY"), + base_url=os.getenv("LITELLM_PROXY_URL"), + ) + + +def _get_embed_client(key: str) -> OpenAI: + if not key: + raise ValueError("DashScope API Key 未配置。请通过 Header 或 .env 传入。") + return OpenAI( + api_key=key, + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + ) + + +# ═══════════ Step 1: VOC → UDE 转写 ═══════════ + +async def _call_ude_llm(prompt: str, comments: list[dict]) -> list[dict]: + """单批 LLM 转写""" + client = _get_llm_client() + user_msg = "请将以下消费者评论转写为 UDE 格式句,返回 JSON:\n\n" + for c in comments: + user_msg += f"[{c['id']}] 平台:{c['platform']} 原文: \"{c['text'][:300]}\"\n\n" + + try: + resp = await client.chat.completions.create( + model=MODEL, + messages=[ + {"role": "system", "content": prompt}, + {"role": "user", "content": user_msg}, + ], + temperature=TEMPERATURE, + max_tokens=4000, + response_format={"type": "json_object"}, + ) + content = (resp.choices[0].message.content or "").strip() + parsed = json.loads(content) + if isinstance(parsed, dict): + for key in ("results", "data", "items", "udes"): + if key in parsed and isinstance(parsed[key], list): + return parsed[key] + if isinstance(parsed, list): + return parsed + return [] + except Exception as e: + logger.warning(f"[UDE] LLM 转写失败: {str(e)[:80]}") + return [] + + +async def _process_ude_batch(comments, prompt, semaphore): + async with semaphore: + return await _call_ude_llm(prompt, comments) + + +async def run_ude_extraction(case_id: str, limit: int = 0) -> dict: + """从共享 VOC 数据读取原始评论,转写为 UDE,存入案例 DB""" + from db import get_case_conn, get_voc_conn + + prompt = PROMPT_PATH.read_text("utf-8") if PROMPT_PATH.exists() else "" + if not prompt: + return {"error": "UDE 转写 prompt 未找到 (prompts/voc_to_ude.txt)"} + + with get_case_conn(case_id) as case_conn: + card = case_conn.execute("SELECT voc_research_id FROM case_card LIMIT 1").fetchone() + if not card or not card["voc_research_id"]: + return {"error": "未关联 VOC 研究。请先调用 link-voc。"} + + voc_research_id = card["voc_research_id"] + + # 获取已转写的 voc_comment_ids + done_ids = {r[0] for r in case_conn.execute( + "SELECT voc_comment_id FROM ude_sentences" + ).fetchall()} + + # 从 VOC DB 只读获取原始评论 + with get_voc_conn(voc_research_id) as voc_conn: + rows = voc_conn.execute(""" + SELECT id, platform, text + FROM comments + WHERE length(text) > 10 + ORDER BY id + """).fetchall() + + # 过滤已完成的 + pending = [r for r in rows if r["id"] not in done_ids] + if not pending: + with get_case_conn(case_id) as conn: + total = conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0] + return {"message": "全部已转写完成", "total_udes": total, "new": 0} + + if limit > 0: + pending = pending[:limit] + + # 切批 + batches = [] + for i in range(0, len(pending), BATCH_SIZE): + chunk = pending[i:i + BATCH_SIZE] + batches.append([{"id": r["id"], "platform": r["platform"], "text": r["text"]} for r in chunk]) + + semaphore = asyncio.Semaphore(CONCURRENCY) + tasks = [asyncio.create_task(_process_ude_batch(b, prompt, semaphore)) for b in batches] + all_results = await asyncio.gather(*tasks) + + # 写入案例 DB + ok = 0 + with get_case_conn(case_id) as case_conn: + for results in all_results: + for r in (results or []): + if not isinstance(r, dict): + continue + ude_text = r.get("ude") + if not ude_text: + continue + cid = r.get("id") + if not cid: + continue + try: + case_conn.execute( + "INSERT OR IGNORE INTO ude_sentences (voc_comment_id, ude_text, confidence) VALUES (?, ?, ?)", + (int(cid), ude_text, r.get("confidence", 0.5)) + ) + ok += 1 + except Exception as e: + logger.warning(f"[UDE] 写入失败 id={cid}: {e}") + case_conn.commit() + total = case_conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0] + + return { + "new_udes": ok, + "total_udes": total, + "total_voc_comments": len(rows), + "remaining": len(rows) - total, + "batches": len(batches), + } + + +# ═══════════ Step 2 & 3: 向量化 + 聚类 ═══════════ + +def _embed_texts(client: OpenAI, texts: list[str]) -> list[list[float]]: + all_vectors = [] + for i in range(0, len(texts), EMBED_BATCH_SIZE): + batch = texts[i:i + EMBED_BATCH_SIZE] + resp = client.embeddings.create(model="text-embedding-v4", input=batch, dimensions=EMBED_DIM) + all_vectors.extend([item.embedding for item in resp.data]) + return all_vectors + + +def run_clustering(case_id: str, eps: float = 0.25, min_samples: int = 3, + dashscope_key: str = None) -> dict: + """向量化 + DBSCAN 聚类""" + from sklearn.cluster import DBSCAN + from sklearn.metrics.pairwise import cosine_distances + from db import get_case_conn, get_voc_conn + + key = dashscope_key or os.getenv("DASHSCOPE_API_KEY", "") + if not key: + return {"error": "DashScope API Key 未配置。"} + + embed_client = _get_embed_client(key) + + with get_case_conn(case_id) as conn: + rows = conn.execute("SELECT id, voc_comment_id, ude_text FROM ude_sentences ORDER BY id").fetchall() + if len(rows) < min_samples: + return {"error": f"UDE 不足 ({len(rows)} 条),至少需要 {min_samples} 条。"} + + ude_texts = [r["ude_text"] for r in rows] + ude_ids = [r["id"] for r in rows] + comment_ids = [r["voc_comment_id"] for r in rows] + + # 向量化 + vectors = _embed_texts(embed_client, ude_texts) + vec_array = np.array(vectors) + + # 保存向量 + for i, uid in enumerate(ude_ids): + conn.execute("UPDATE ude_sentences SET vector = ? WHERE id = ?", + (json.dumps(vectors[i]), uid)) + + # DBSCAN + dist_matrix = cosine_distances(vec_array) + clustering = DBSCAN(eps=eps, min_samples=min_samples, metric="precomputed").fit(dist_matrix) + labels = clustering.labels_ + + # 更新聚类标签 + for i, uid in enumerate(ude_ids): + conn.execute("UPDATE ude_sentences SET cluster_id = ? WHERE id = ?", + (int(labels[i]), uid)) + + # 清空旧聚类,写入新聚类 + conn.execute("DELETE FROM ude_clusters") + + # 获取关联的 VOC research_id 用于读取原声 + card = conn.execute("SELECT voc_research_id FROM case_card LIMIT 1").fetchone() + voc_rid = card["voc_research_id"] if card else None + + clusters = [] + unique_labels = sorted(set(labels) - {-1}) + + for cluster_id in unique_labels: + member_indices = [i for i, l in enumerate(labels) if l == cluster_id] + member_texts = [ude_texts[i] for i in member_indices] + member_vectors = vec_array[member_indices] + member_cids = [comment_ids[i] for i in member_indices] + + # 簇中心 + centroid = member_vectors.mean(axis=0) + dists = cosine_distances([centroid], member_vectors)[0] + representative = member_texts[dists.argmin()] + + # 取原声 + sample_voices = [] + if voc_rid: + try: + voc_conn = get_voc_conn(voc_rid) + for cid in member_cids[:5]: + voice = voc_conn.execute( + "SELECT text, platform FROM comments WHERE id = ?", (cid,) + ).fetchone() + if voice: + sample_voices.append({"text": voice["text"][:200], "platform": voice["platform"]}) + voc_conn.close() + except Exception: + pass + + conn.execute( + "INSERT INTO ude_clusters (representative_ude, coverage, sample_voices) VALUES (?, ?, ?)", + (representative, len(member_indices), json.dumps(sample_voices, ensure_ascii=False)) + ) + clusters.append({ + "cluster_id": int(cluster_id), + "representative_ude": representative, + "coverage": len(member_indices), + "sample_voices": sample_voices, + }) + + conn.commit() + clusters.sort(key=lambda x: x["coverage"], reverse=True) + noise_count = int((labels == -1).sum()) + + return { + "total_udes": len(labels), + "num_clusters": len(clusters), + "noise_count": noise_count, + "noise_pct": round(noise_count / len(labels) * 100, 1) if len(labels) else 0, + "clusters": clusters, + "params": {"eps": eps, "min_samples": min_samples}, + } + + +# ═══════════ Step 5: 覆盖扫描 ═══════════ + +def run_coverage_scan(case_id: str) -> dict: + from db import get_case_conn, get_voc_conn + + with get_case_conn(case_id) as conn: + card = conn.execute("SELECT voc_research_id FROM case_card LIMIT 1").fetchone() + voc_rid = card["voc_research_id"] if card else None + + total_udes = conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0] + clustered = conn.execute("SELECT count(*) FROM ude_sentences WHERE cluster_id >= 0").fetchone()[0] + noise = conn.execute("SELECT count(*) FROM ude_sentences WHERE cluster_id = -1").fetchone()[0] + + cluster_stats = [dict(r) for r in conn.execute( + "SELECT cluster_id, count(*) as cnt FROM ude_sentences WHERE cluster_id >= 0 GROUP BY cluster_id ORDER BY cnt DESC" + ).fetchall()] + + noise_samples = [dict(r) for r in conn.execute( + "SELECT ude_text, voc_comment_id, confidence FROM ude_sentences WHERE cluster_id = -1 ORDER BY confidence DESC LIMIT 10" + ).fetchall()] + + total_voc = 0 + if voc_rid: + try: + with get_voc_conn(voc_rid) as voc: + total_voc = voc.execute( + "SELECT count(*) FROM comments WHERE length(text) > 10 " + ).fetchone()[0] + except Exception: + pass + + return { + "total_voc_comments": total_voc, + "total_udes": total_udes, + "udes_clustered": clustered, + "udes_noise": noise, + "coverage_rate": round(clustered / total_voc * 100, 1) if total_voc else 0, + "cluster_distribution": cluster_stats, + "noise_samples": noise_samples, + "verdict": "充分" if (total_udes > 0 and noise / total_udes < 0.1) else + ("需关注" if (total_udes > 0 and noise / total_udes < 0.2) else "需调参"), + }