""" 黑手党提案 — 数据库管理(完全独立,阿里云内闭环) 每个提案案例一个 SQLite 文件,自包含所有数据: - case_card:案例元信息 - comments:从 VOC API 导入的评论(本地副本) - ude_sentences / ude_clusters:UDE 分析结果 - conflicts / proposal_sections:后续阶段 """ import os import sqlite3 import uuid from pathlib import Path from dotenv import load_dotenv load_dotenv() DATA_DIR = Path(__file__).parent / "data" DATA_DIR.mkdir(exist_ok=True) # VOC 公网 API(腾讯云,跨云只读访问) VOC_API_BASE = os.getenv("VOC_API_BASE", "https://brand.brainwork.club/voc/api/research") # ═══════════ Schema ═══════════ CASE_SCHEMA = """ CREATE TABLE IF NOT EXISTS case_card ( brand_name TEXT NOT NULL, category TEXT, focus_product TEXT, competitors TEXT, voc_research_id TEXT, voc_api_base TEXT, created_at TEXT DEFAULT (datetime('now')), status TEXT DEFAULT 'draft' ); CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, voc_id INTEGER, platform TEXT, text TEXT NOT NULL, like_count INTEGER DEFAULT 0, published_at TEXT, imported_at TEXT DEFAULT (datetime('now')), UNIQUE(voc_id) ); CREATE TABLE IF NOT EXISTS ude_sentences ( id INTEGER PRIMARY KEY AUTOINCREMENT, comment_id INTEGER REFERENCES comments(id), ude_text TEXT NOT NULL, confidence REAL DEFAULT 0.5, vector TEXT, cluster_id INTEGER DEFAULT -1, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS ude_clusters ( id INTEGER PRIMARY KEY AUTOINCREMENT, representative_ude TEXT, coverage INTEGER, sample_voices TEXT, user_label TEXT, confirmed INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS conflicts ( id INTEGER PRIMARY KEY AUTOINCREMENT, ude_cluster_id INTEGER, goal TEXT, need TEXT, prerequisite TEXT, convention TEXT, conflict_type TEXT, description TEXT ); CREATE TABLE IF NOT EXISTS proposal_sections ( id INTEGER PRIMARY KEY AUTOINCREMENT, section TEXT, content TEXT, version INTEGER DEFAULT 1, updated_at TEXT DEFAULT (datetime('now')) ); """ # ═══════════ 案例 DB ═══════════ def get_case_conn(case_id: str) -> sqlite3.Connection: """获取案例 DB 连接""" path = DATA_DIR / f"{case_id}.db" if not path.exists(): raise FileNotFoundError(f"案例 {case_id} 不存在") conn = sqlite3.connect(str(path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_case_db(brand_name: str, category: str = "", focus_product: str = "", competitors: str = "[]", voc_research_id: str = None) -> str: """创建新案例,返回 case_id""" case_id = uuid.uuid4().hex[:8] path = DATA_DIR / f"{case_id}.db" conn = sqlite3.connect(str(path)) conn.row_factory = sqlite3.Row conn.executescript(CASE_SCHEMA) conn.execute( "INSERT INTO case_card (brand_name, category, focus_product, competitors, voc_research_id, voc_api_base) VALUES (?,?,?,?,?,?)", (brand_name, category, focus_product, competitors, voc_research_id, VOC_API_BASE) ) conn.commit() conn.close() return case_id def list_cases() -> list[dict]: """列出所有案例""" cases = [] for db_file in sorted(DATA_DIR.glob("*.db")): case_id = db_file.stem try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row card = conn.execute("SELECT * FROM case_card LIMIT 1").fetchone() if card: comment_count = conn.execute("SELECT count(*) FROM comments").fetchone()[0] ude_count = conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0] cluster_count = conn.execute("SELECT count(*) FROM ude_clusters").fetchone()[0] cases.append({ "case_id": case_id, **dict(card), "comment_count": comment_count, "ude_count": ude_count, "cluster_count": cluster_count, }) conn.close() except Exception: pass return cases