chanpinhsd/backend/db.py
lidf c5e2a58258 refactor: v2.0 完全解耦 — 阿里云内闭环
- 删除 VOC_DATA_DIR / get_voc_conn(不再跨云直读 SQLite)
- 案例 DB 自带 comments 表,自包含所有数据
- 新增 POST /import-voc:通过 VOC 公网 API 导入评论
- VOC_API_BASE 环境变量控制 API 地址
- 新增 httpx 依赖
2026-04-07 19:47:34 +08:00

145 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
黑手党提案 — 数据库管理(完全独立,阿里云内闭环)
每个提案案例一个 SQLite 文件,自包含所有数据:
- case_card案例元信息
- comments从 VOC API 导入的评论(本地副本)
- ude_sentences / ude_clustersUDE 分析结果
- conflicts / proposal_sections后续阶段
"""
import os
import sqlite3
import uuid
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
DATA_DIR = Path(__file__).parent / "data"
DATA_DIR.mkdir(exist_ok=True)
# VOC 公网 API腾讯云跨云只读访问
VOC_API_BASE = os.getenv("VOC_API_BASE", "https://brand.brainwork.club/voc/api/research")
# ═══════════ Schema ═══════════
CASE_SCHEMA = """
CREATE TABLE IF NOT EXISTS case_card (
brand_name TEXT NOT NULL,
category TEXT,
focus_product TEXT,
competitors TEXT,
voc_research_id TEXT,
voc_api_base TEXT,
created_at TEXT DEFAULT (datetime('now')),
status TEXT DEFAULT 'draft'
);
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
voc_id INTEGER,
platform TEXT,
text TEXT NOT NULL,
like_count INTEGER DEFAULT 0,
published_at TEXT,
imported_at TEXT DEFAULT (datetime('now')),
UNIQUE(voc_id)
);
CREATE TABLE IF NOT EXISTS ude_sentences (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id INTEGER REFERENCES comments(id),
ude_text TEXT NOT NULL,
confidence REAL DEFAULT 0.5,
vector TEXT,
cluster_id INTEGER DEFAULT -1,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS ude_clusters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
representative_ude TEXT,
coverage INTEGER,
sample_voices TEXT,
user_label TEXT,
confirmed INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS conflicts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ude_cluster_id INTEGER,
goal TEXT,
need TEXT,
prerequisite TEXT,
convention TEXT,
conflict_type TEXT,
description TEXT
);
CREATE TABLE IF NOT EXISTS proposal_sections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
section TEXT,
content TEXT,
version INTEGER DEFAULT 1,
updated_at TEXT DEFAULT (datetime('now'))
);
"""
# ═══════════ 案例 DB ═══════════
def get_case_conn(case_id: str) -> sqlite3.Connection:
"""获取案例 DB 连接"""
path = DATA_DIR / f"{case_id}.db"
if not path.exists():
raise FileNotFoundError(f"案例 {case_id} 不存在")
conn = sqlite3.connect(str(path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
def init_case_db(brand_name: str, category: str = "", focus_product: str = "",
competitors: str = "[]", voc_research_id: str = None) -> str:
"""创建新案例,返回 case_id"""
case_id = uuid.uuid4().hex[:8]
path = DATA_DIR / f"{case_id}.db"
conn = sqlite3.connect(str(path))
conn.row_factory = sqlite3.Row
conn.executescript(CASE_SCHEMA)
conn.execute(
"INSERT INTO case_card (brand_name, category, focus_product, competitors, voc_research_id, voc_api_base) VALUES (?,?,?,?,?,?)",
(brand_name, category, focus_product, competitors, voc_research_id, VOC_API_BASE)
)
conn.commit()
conn.close()
return case_id
def list_cases() -> list[dict]:
"""列出所有案例"""
cases = []
for db_file in sorted(DATA_DIR.glob("*.db")):
case_id = db_file.stem
try:
conn = sqlite3.connect(str(db_file))
conn.row_factory = sqlite3.Row
card = conn.execute("SELECT * FROM case_card LIMIT 1").fetchone()
if card:
comment_count = conn.execute("SELECT count(*) FROM comments").fetchone()[0]
ude_count = conn.execute("SELECT count(*) FROM ude_sentences").fetchone()[0]
cluster_count = conn.execute("SELECT count(*) FROM ude_clusters").fetchone()[0]
cases.append({
"case_id": case_id,
**dict(card),
"comment_count": comment_count,
"ude_count": ude_count,
"cluster_count": cluster_count,
})
conn.close()
except Exception:
pass
return cases