""" 黑手党提案 — 数据库管理 双库设计: 1. 案例 DB(读写):每个提案案例一个 SQLite,存分析结果 2. VOC DB(只读):读取共享 VOC 数据层的原始评论 """ import os import sqlite3 import uuid from pathlib import Path from dotenv import load_dotenv load_dotenv() DATA_DIR = Path(__file__).parent / "data" DATA_DIR.mkdir(exist_ok=True) VOC_DATA_DIR = Path(os.getenv("VOC_DATA_DIR", "")) # ═══════════ 案例 DB(读写) ═══════════ CASE_SCHEMA = """ CREATE TABLE IF NOT EXISTS case_card ( brand_name TEXT NOT NULL, category TEXT, focus_product TEXT, competitors TEXT, voc_research_id TEXT, created_at TEXT DEFAULT (datetime('now')), status TEXT DEFAULT 'draft' ); CREATE TABLE IF NOT EXISTS ude_sentences ( id INTEGER PRIMARY KEY AUTOINCREMENT, voc_comment_id INTEGER, ude_text TEXT NOT NULL, confidence REAL DEFAULT 0.5, vector TEXT, cluster_id INTEGER DEFAULT -1, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS ude_clusters ( id INTEGER PRIMARY KEY AUTOINCREMENT, representative_ude TEXT, coverage INTEGER, sample_voices TEXT, user_label TEXT, confirmed INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS conflicts ( id INTEGER PRIMARY KEY AUTOINCREMENT, ude_cluster_id INTEGER, goal TEXT, need TEXT, prerequisite TEXT, convention TEXT, conflict_type TEXT, description TEXT ); CREATE TABLE IF NOT EXISTS proposal_sections ( id INTEGER PRIMARY KEY AUTOINCREMENT, section TEXT, content TEXT, version INTEGER DEFAULT 1, updated_at TEXT DEFAULT (datetime('now')) ); """ def get_case_conn(case_id: str) -> sqlite3.Connection: """获取案例 DB 连接(读写)""" path = DATA_DIR / f"{case_id}.db" if not path.exists(): raise FileNotFoundError(f"案例 {case_id} 不存在") conn = sqlite3.connect(str(path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_case_db(brand_name: str, category: str = "", focus_product: str = "", competitors: str = "[]", voc_research_id: str = None) -> str: """创建新案例,返回 case_id""" case_id = uuid.uuid4().hex[:8] path = DATA_DIR / f"{case_id}.db" conn = sqlite3.connect(str(path)) conn.row_factory = sqlite3.Row conn.executescript(CASE_SCHEMA) conn.execute( "INSERT INTO case_card (brand_name, category, focus_product, competitors, voc_research_id) VALUES (?,?,?,?,?)", (brand_name, category, focus_product, competitors, voc_research_id) ) conn.commit() conn.close() return case_id def list_cases() -> list[dict]: """列出所有案例""" cases = [] for db_file in sorted(DATA_DIR.glob("*.db")): case_id = db_file.stem try: conn = sqlite3.connect(str(db_file)) conn.row_factory = sqlite3.Row card = conn.execute("SELECT * FROM case_card LIMIT 1").fetchone() if card: ude_count = conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0] cluster_count = conn.execute("SELECT count(*) FROM ude_clusters").fetchone()[0] cases.append({ "case_id": case_id, **dict(card), "ude_count": ude_count, "cluster_count": cluster_count, }) conn.close() except Exception: pass return cases # ═══════════ VOC DB(只读) ═══════════ def get_voc_conn(voc_research_id: str) -> sqlite3.Connection: """只读访问共享 VOC 数据""" if not VOC_DATA_DIR.exists(): raise FileNotFoundError(f"VOC 数据目录不存在: {VOC_DATA_DIR}") path = VOC_DATA_DIR / f"{voc_research_id}.db" if not path.exists(): raise FileNotFoundError(f"VOC 研究 {voc_research_id} 不存在") conn = sqlite3.connect(f"file:{path}?mode=ro", uri=True) conn.row_factory = sqlite3.Row return conn def list_voc_researches() -> list[dict]: """列出共享 VOC 数据层中的所有研究""" if not VOC_DATA_DIR.exists(): return [] researches = [] for db_file in sorted(VOC_DATA_DIR.glob("*.db")): if db_file.name in ("global_cache.db", "agent_sessions.db"): continue rid = db_file.stem try: conn = sqlite3.connect(f"file:{db_file}?mode=ro", uri=True) conn.row_factory = sqlite3.Row card = conn.execute("SELECT brand_name FROM research_card LIMIT 1").fetchone() comment_count = conn.execute( "SELECT count(*) FROM comments WHERE length(text) > 10" ).fetchone()[0] conn.close() if card and comment_count > 0: researches.append({ "research_id": rid, "brand_name": card["brand_name"], "comment_count": comment_count, }) except Exception: pass return researches