from __future__ import annotations import json import os import sqlite3 import time from pathlib import Path from typing import Any SCHEMA_VERSION = 1 def default_database_url(jobs_dir: Path) -> str: return os.getenv("APP_DB_URL") or os.getenv("DATABASE_URL") or f"sqlite:///{jobs_dir / 'app.db'}" def redact_database_url(url: str) -> str: if "://" not in url or "@" not in url: return url scheme, rest = url.split("://", 1) _, host = rest.rsplit("@", 1) return f"{scheme}://***@{host}" def infer_source_kind(url: str) -> str: if url.startswith("upload://"): return "upload" if url.startswith("http://") or url.startswith("https://"): return "tiktok_link" return "unknown" def default_workflow_mode(source_kind: str) -> str: if source_kind == "upload": return "uploaded_reference" return "feed_recreation" def document_title(url: str, source_kind: str, fallback: str) -> str: if source_kind == "upload": return url.replace("upload://", "", 1).strip() or fallback if url: return url.strip()[:120] return fallback def storage_prefix(document_id: str, source_kind: str, workflow_mode: str) -> str: source = source_kind or "unknown" mode = workflow_mode or default_workflow_mode(source) return f"{mode}/{source}/{document_id}" class AppDatabase: def __init__(self, url: str, jobs_dir: Path): self.url = url self.jobs_dir = jobs_dir self.path = self._sqlite_path(url) self.enabled = True self.error = "" @staticmethod def _sqlite_path(url: str) -> Path: if url == ":memory:": return Path(":memory:") if not url.startswith("sqlite:///"): raise RuntimeError("当前内置数据库层只支持 sqlite:/// URL;Postgres 迁移会复用同一张表语义。") raw = url[len("sqlite:///"):] return Path(raw).expanduser().resolve() def connect(self) -> sqlite3.Connection: if str(self.path) != ":memory:": self.path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(self.path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def init(self) -> None: with self.connect() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS schema_meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, title TEXT NOT NULL, source_kind TEXT NOT NULL, workflow_mode TEXT NOT NULL, source_url TEXT NOT NULL DEFAULT '', primary_job_id TEXT NOT NULL DEFAULT '', status TEXT NOT NULL DEFAULT 'created', storage_prefix TEXT NOT NULL, metadata_json TEXT NOT NULL DEFAULT '{}', created_at REAL NOT NULL, updated_at REAL NOT NULL ); CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL, source_kind TEXT NOT NULL, workflow_mode TEXT NOT NULL, source_url TEXT NOT NULL DEFAULT '', status TEXT NOT NULL, progress INTEGER NOT NULL DEFAULT 0, message TEXT NOT NULL DEFAULT '', storage_path TEXT NOT NULL, state_path TEXT NOT NULL, video_url TEXT NOT NULL DEFAULT '', duration REAL NOT NULL DEFAULT 0, width INTEGER NOT NULL DEFAULT 0, height INTEGER NOT NULL DEFAULT 0, frame_count INTEGER NOT NULL DEFAULT 0, video_count INTEGER NOT NULL DEFAULT 0, error TEXT NOT NULL DEFAULT '', metadata_json TEXT NOT NULL DEFAULT '{}', created_at REAL NOT NULL, updated_at REAL NOT NULL, FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS media_assets ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL, job_id TEXT NOT NULL, kind TEXT NOT NULL, role TEXT NOT NULL, path TEXT NOT NULL DEFAULT '', url TEXT NOT NULL DEFAULT '', frame_index INTEGER, timestamp REAL, width INTEGER NOT NULL DEFAULT 0, height INTEGER NOT NULL DEFAULT 0, duration REAL NOT NULL DEFAULT 0, metadata_json TEXT NOT NULL DEFAULT '{}', created_at REAL NOT NULL, updated_at REAL NOT NULL, FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE, FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_documents_updated_at ON documents(updated_at DESC); CREATE INDEX IF NOT EXISTS idx_documents_source_kind ON documents(source_kind); CREATE INDEX IF NOT EXISTS idx_documents_workflow_mode ON documents(workflow_mode); CREATE INDEX IF NOT EXISTS idx_jobs_document_id ON jobs(document_id); CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC); CREATE INDEX IF NOT EXISTS idx_assets_document_id ON media_assets(document_id); CREATE INDEX IF NOT EXISTS idx_assets_job_id ON media_assets(job_id); CREATE INDEX IF NOT EXISTS idx_assets_role ON media_assets(role); """ ) conn.execute( "INSERT OR REPLACE INTO schema_meta(key, value) VALUES('schema_version', ?)", (str(SCHEMA_VERSION),), ) def normalize_job_document(self, job: dict[str, Any]) -> dict[str, Any]: job_id = str(job.get("id") or "") source_url = str(job.get("url") or "") source_kind = str(job.get("source_kind") or "") or infer_source_kind(source_url) workflow_mode = str(job.get("workflow_mode") or "") or default_workflow_mode(source_kind) document_id = str(job.get("document_id") or "") or job_id prefix = str(job.get("storage_prefix") or "") or storage_prefix(document_id, source_kind, workflow_mode) return { "document_id": document_id, "source_kind": source_kind, "workflow_mode": workflow_mode, "storage_prefix": prefix, "title": document_title(source_url, source_kind, document_id), } def sync_job(self, job: dict[str, Any], job_path: Path) -> None: if not self.enabled: return now = time.time() job_id = str(job.get("id") or "") if not job_id: return doc = self.normalize_job_document(job) state_path = job_path / "state.json" frames = list(job.get("frames") or []) generated_videos = list(job.get("generated_videos") or []) metadata = { "audio_segment_count": len(job.get("transcript") or []), "product_ref_count": len(job.get("product_refs") or []), "storyboard_image_count": len(job.get("storyboard_images") or []), } with self.connect() as conn: existing = conn.execute( "SELECT created_at FROM documents WHERE id = ?", (doc["document_id"],), ).fetchone() created_at = float(existing["created_at"]) if existing else now conn.execute( """ INSERT INTO documents( id, title, source_kind, workflow_mode, source_url, primary_job_id, status, storage_prefix, metadata_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET title = excluded.title, source_kind = excluded.source_kind, workflow_mode = excluded.workflow_mode, source_url = excluded.source_url, primary_job_id = excluded.primary_job_id, status = excluded.status, storage_prefix = excluded.storage_prefix, metadata_json = excluded.metadata_json, updated_at = excluded.updated_at """, ( doc["document_id"], doc["title"], doc["source_kind"], doc["workflow_mode"], str(job.get("url") or ""), job_id, str(job.get("status") or "created"), doc["storage_prefix"], json.dumps(metadata, ensure_ascii=False), created_at, now, ), ) existing_job = conn.execute("SELECT created_at FROM jobs WHERE id = ?", (job_id,)).fetchone() job_created_at = float(existing_job["created_at"]) if existing_job else now conn.execute( """ INSERT INTO jobs( id, document_id, source_kind, workflow_mode, source_url, status, progress, message, storage_path, state_path, video_url, duration, width, height, frame_count, video_count, error, metadata_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET document_id = excluded.document_id, source_kind = excluded.source_kind, workflow_mode = excluded.workflow_mode, source_url = excluded.source_url, status = excluded.status, progress = excluded.progress, message = excluded.message, storage_path = excluded.storage_path, state_path = excluded.state_path, video_url = excluded.video_url, duration = excluded.duration, width = excluded.width, height = excluded.height, frame_count = excluded.frame_count, video_count = excluded.video_count, error = excluded.error, metadata_json = excluded.metadata_json, updated_at = excluded.updated_at """, ( job_id, doc["document_id"], doc["source_kind"], doc["workflow_mode"], str(job.get("url") or ""), str(job.get("status") or "created"), int(job.get("progress") or 0), str(job.get("message") or ""), str(job_path), str(state_path), str(job.get("video_url") or ""), float(job.get("duration") or 0), int(job.get("width") or 0), int(job.get("height") or 0), len(frames), len(generated_videos), str(job.get("error") or ""), json.dumps(metadata, ensure_ascii=False), job_created_at, now, ), ) conn.execute("DELETE FROM media_assets WHERE job_id = ?", (job_id,)) for asset in self._job_assets(job, job_path, doc["document_id"]): conn.execute( """ INSERT INTO media_assets( id, document_id, job_id, kind, role, path, url, frame_index, timestamp, width, height, duration, metadata_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( asset["id"], asset["document_id"], asset["job_id"], asset["kind"], asset["role"], asset.get("path", ""), asset.get("url", ""), asset.get("frame_index"), asset.get("timestamp"), int(asset.get("width") or 0), int(asset.get("height") or 0), float(asset.get("duration") or 0), json.dumps(asset.get("metadata") or {}, ensure_ascii=False), now, now, ), ) def _job_assets(self, job: dict[str, Any], job_path: Path, document_id: str) -> list[dict[str, Any]]: job_id = str(job.get("id") or "") items: list[dict[str, Any]] = [] def add( asset_id: str, kind: str, role: str, path: Path | str = "", url: str = "", frame_index: int | None = None, timestamp: float | None = None, width: int = 0, height: int = 0, duration: float = 0.0, metadata: dict[str, Any] | None = None, ) -> None: items.append({ "id": asset_id, "document_id": document_id, "job_id": job_id, "kind": kind, "role": role, "path": str(path) if path else "", "url": url, "frame_index": frame_index, "timestamp": timestamp, "width": width, "height": height, "duration": duration, "metadata": metadata or {}, }) if (job_path / "source.mp4").exists() or job.get("video_url"): add( f"{job_id}:source_video", "video", "source_video", job_path / "source.mp4", str(job.get("video_url") or f"/jobs/{job_id}/video.mp4"), duration=float(job.get("duration") or 0), width=int(job.get("width") or 0), height=int(job.get("height") or 0), ) if (job_path / "audio.wav").exists() or job.get("source_audio_url"): add( f"{job_id}:source_audio", "audio", "source_audio", job_path / "audio.wav", str(job.get("source_audio_url") or f"/jobs/{job_id}/audio.wav"), duration=float(job.get("duration") or 0), ) for frame in job.get("frames") or []: idx = int(frame.get("index") or 0) add( f"{job_id}:frame:{idx}", "image", "keyframe", job_path / "frames" / f"{idx:03d}.jpg", str(frame.get("url") or f"/jobs/{job_id}/frames/{idx}.jpg"), frame_index=idx, timestamp=float(frame.get("timestamp") or 0), metadata={"quality_report": frame.get("quality_report")}, ) if frame.get("cleaned_url"): add( f"{job_id}:frame:{idx}:cleaned", "image", "cleaned_keyframe", job_path / "cleaned" / f"{idx:03d}.jpg", str(frame.get("cleaned_url")), frame_index=idx, timestamp=float(frame.get("timestamp") or 0), ) for generated in frame.get("generated_images") or []: gen_id = str(generated.get("id") or "") if gen_id: add( f"{job_id}:generated_image:{idx}:{gen_id}", "image", "generated_image", job_path / "gen" / f"{idx:03d}_{gen_id}.jpg", str(generated.get("url") or ""), frame_index=idx, metadata={"model": generated.get("model"), "mode": generated.get("mode")}, ) for scene_asset in frame.get("scene_assets") or []: asset_id = str(scene_asset.get("id") or "") if asset_id: add( f"{job_id}:scene_asset:{asset_id}", "image", str(scene_asset.get("asset_role") or "scene_asset"), job_path / "assets" / f"{asset_id}.jpg", str(scene_asset.get("url") or ""), frame_index=idx, width=int(scene_asset.get("width") or 0), height=int(scene_asset.get("height") or 0), metadata={"label": scene_asset.get("label"), "scene_mode": scene_asset.get("scene_mode")}, ) for element in frame.get("elements") or []: element_id = str(element.get("id") or "") cutout_ids = list(element.get("cutouts") or []) legacy_cutout = element.get("cutout_id") if legacy_cutout and legacy_cutout not in cutout_ids: cutout_ids.append(legacy_cutout) for cutout_id in cutout_ids: add( f"{job_id}:cutout:{idx}:{element_id}:{cutout_id}", "image", "element_cutout", job_path / "elements" / f"{idx:03d}_{element_id}_{cutout_id}.jpg", f"/jobs/{job_id}/frames/{idx}/elements/{element_id}/cutouts/{cutout_id}.jpg", frame_index=idx, metadata={"element_id": element_id, "name_zh": element.get("name_zh")}, ) for subject_asset in element.get("subject_assets") or []: asset_id = str(subject_asset.get("id") or "") if asset_id: add( f"{job_id}:subject_asset:{asset_id}", "image", "subject_asset", job_path / "assets" / f"{asset_id}.jpg", str(subject_asset.get("url") or ""), frame_index=idx, width=int(subject_asset.get("width") or 0), height=int(subject_asset.get("height") or 0), metadata={"view": subject_asset.get("view"), "label": subject_asset.get("label")}, ) for ref in job.get("product_refs") or []: asset_id = str(ref.get("id") or ref.get("asset_id") or ref.get("url") or "") if asset_id: add( f"{job_id}:product_ref:{asset_id}", "image", "product_ref", self._path_from_job_url(job_path, job_id, str(ref.get("url") or "")), str(ref.get("url") or ""), metadata=ref, ) for video in job.get("generated_videos") or []: video_id = str(video.get("id") or "") if video_id: add( f"{job_id}:generated_video:{video_id}", "video", "generated_video", job_path / "videos" / f"{video_id}.mp4", str(video.get("url") or ""), frame_index=video.get("frame_idx"), duration=float(video.get("duration") or 0), metadata={"status": video.get("status"), "model": video.get("model"), "error": video.get("error")}, ) return items def _path_from_job_url(self, job_path: Path, job_id: str, url: str) -> str: prefix = f"/jobs/{job_id}/" if not url.startswith(prefix): return "" tail = url[len(prefix):] if tail == "video.mp4": return str(job_path / "source.mp4") return str(job_path / tail) def delete_job(self, job_id: str) -> None: if not self.enabled: return with self.connect() as conn: row = conn.execute("SELECT document_id FROM jobs WHERE id = ?", (job_id,)).fetchone() conn.execute("DELETE FROM jobs WHERE id = ?", (job_id,)) if row: remaining = conn.execute( "SELECT COUNT(*) AS c FROM jobs WHERE document_id = ?", (row["document_id"],), ).fetchone() if int(remaining["c"] or 0) == 0: conn.execute("DELETE FROM documents WHERE id = ?", (row["document_id"],)) def list_documents(self, limit: int | None = None) -> list[dict[str, Any]]: sql = """ SELECT d.*, COUNT(DISTINCT j.id) AS job_count, COUNT(DISTINCT a.id) AS asset_count FROM documents d LEFT JOIN jobs j ON j.document_id = d.id LEFT JOIN media_assets a ON a.document_id = d.id GROUP BY d.id ORDER BY d.updated_at DESC """ params: tuple[Any, ...] = () if limit is not None and limit > 0: sql += " LIMIT ?" params = (limit,) with self.connect() as conn: rows = conn.execute(sql, params).fetchall() return [dict(row) for row in rows] def health(self) -> dict[str, Any]: if not self.enabled: return {"enabled": False, "url": redact_database_url(self.url), "error": self.error} try: with self.connect() as conn: docs = conn.execute("SELECT COUNT(*) AS c FROM documents").fetchone()["c"] jobs = conn.execute("SELECT COUNT(*) AS c FROM jobs").fetchone()["c"] assets = conn.execute("SELECT COUNT(*) AS c FROM media_assets").fetchone()["c"] return { "enabled": True, "url": redact_database_url(self.url), "schema_version": SCHEMA_VERSION, "documents": int(docs or 0), "jobs": int(jobs or 0), "assets": int(assets or 0), } except Exception as e: return {"enabled": False, "url": redact_database_url(self.url), "error": str(e)} def create_database(url: str, jobs_dir: Path) -> AppDatabase: db = AppDatabase(url, jobs_dir) db.init() return db