Files
20260512-skg-tk/api/database.py

537 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import os
import sqlite3
import time
from pathlib import Path
from typing import Any
SCHEMA_VERSION = 1
def default_database_url(jobs_dir: Path) -> str:
return os.getenv("APP_DB_URL") or os.getenv("DATABASE_URL") or f"sqlite:///{jobs_dir / 'app.db'}"
def redact_database_url(url: str) -> str:
if "://" not in url or "@" not in url:
return url
scheme, rest = url.split("://", 1)
_, host = rest.rsplit("@", 1)
return f"{scheme}://***@{host}"
def infer_source_kind(url: str) -> str:
if url.startswith("upload://"):
return "upload"
if url.startswith("http://") or url.startswith("https://"):
return "tiktok_link"
return "unknown"
def default_workflow_mode(source_kind: str) -> str:
if source_kind == "upload":
return "uploaded_reference"
return "feed_recreation"
def document_title(url: str, source_kind: str, fallback: str) -> str:
if source_kind == "upload":
return url.replace("upload://", "", 1).strip() or fallback
if url:
return url.strip()[:120]
return fallback
def storage_prefix(document_id: str, source_kind: str, workflow_mode: str) -> str:
source = source_kind or "unknown"
mode = workflow_mode or default_workflow_mode(source)
return f"{mode}/{source}/{document_id}"
class AppDatabase:
def __init__(self, url: str, jobs_dir: Path):
self.url = url
self.jobs_dir = jobs_dir
self.path = self._sqlite_path(url)
self.enabled = True
self.error = ""
@staticmethod
def _sqlite_path(url: str) -> Path:
if url == ":memory:":
return Path(":memory:")
if not url.startswith("sqlite:///"):
raise RuntimeError("当前内置数据库层只支持 sqlite:/// URLPostgres 迁移会复用同一张表语义。")
raw = url[len("sqlite:///"):]
return Path(raw).expanduser().resolve()
def connect(self) -> sqlite3.Connection:
if str(self.path) != ":memory:":
self.path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self.path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def init(self) -> None:
with self.connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS schema_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
source_kind TEXT NOT NULL,
workflow_mode TEXT NOT NULL,
source_url TEXT NOT NULL DEFAULT '',
primary_job_id TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'created',
storage_prefix TEXT NOT NULL,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
source_kind TEXT NOT NULL,
workflow_mode TEXT NOT NULL,
source_url TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL,
progress INTEGER NOT NULL DEFAULT 0,
message TEXT NOT NULL DEFAULT '',
storage_path TEXT NOT NULL,
state_path TEXT NOT NULL,
video_url TEXT NOT NULL DEFAULT '',
duration REAL NOT NULL DEFAULT 0,
width INTEGER NOT NULL DEFAULT 0,
height INTEGER NOT NULL DEFAULT 0,
frame_count INTEGER NOT NULL DEFAULT 0,
video_count INTEGER NOT NULL DEFAULT 0,
error TEXT NOT NULL DEFAULT '',
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS media_assets (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
job_id TEXT NOT NULL,
kind TEXT NOT NULL,
role TEXT NOT NULL,
path TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
frame_index INTEGER,
timestamp REAL,
width INTEGER NOT NULL DEFAULT 0,
height INTEGER NOT NULL DEFAULT 0,
duration REAL NOT NULL DEFAULT 0,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE,
FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_documents_updated_at ON documents(updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_documents_source_kind ON documents(source_kind);
CREATE INDEX IF NOT EXISTS idx_documents_workflow_mode ON documents(workflow_mode);
CREATE INDEX IF NOT EXISTS idx_jobs_document_id ON jobs(document_id);
CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_assets_document_id ON media_assets(document_id);
CREATE INDEX IF NOT EXISTS idx_assets_job_id ON media_assets(job_id);
CREATE INDEX IF NOT EXISTS idx_assets_role ON media_assets(role);
"""
)
conn.execute(
"INSERT OR REPLACE INTO schema_meta(key, value) VALUES('schema_version', ?)",
(str(SCHEMA_VERSION),),
)
def normalize_job_document(self, job: dict[str, Any]) -> dict[str, Any]:
job_id = str(job.get("id") or "")
source_url = str(job.get("url") or "")
source_kind = str(job.get("source_kind") or "") or infer_source_kind(source_url)
workflow_mode = str(job.get("workflow_mode") or "") or default_workflow_mode(source_kind)
document_id = str(job.get("document_id") or "") or job_id
prefix = str(job.get("storage_prefix") or "") or storage_prefix(document_id, source_kind, workflow_mode)
return {
"document_id": document_id,
"source_kind": source_kind,
"workflow_mode": workflow_mode,
"storage_prefix": prefix,
"title": document_title(source_url, source_kind, document_id),
}
def sync_job(self, job: dict[str, Any], job_path: Path) -> None:
if not self.enabled:
return
now = time.time()
job_id = str(job.get("id") or "")
if not job_id:
return
doc = self.normalize_job_document(job)
state_path = job_path / "state.json"
frames = list(job.get("frames") or [])
generated_videos = list(job.get("generated_videos") or [])
metadata = {
"audio_segment_count": len(job.get("transcript") or []),
"product_ref_count": len(job.get("product_refs") or []),
"storyboard_image_count": len(job.get("storyboard_images") or []),
}
with self.connect() as conn:
existing = conn.execute(
"SELECT created_at FROM documents WHERE id = ?",
(doc["document_id"],),
).fetchone()
created_at = float(existing["created_at"]) if existing else now
conn.execute(
"""
INSERT INTO documents(
id, title, source_kind, workflow_mode, source_url, primary_job_id,
status, storage_prefix, metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
title = excluded.title,
source_kind = excluded.source_kind,
workflow_mode = excluded.workflow_mode,
source_url = excluded.source_url,
primary_job_id = excluded.primary_job_id,
status = excluded.status,
storage_prefix = excluded.storage_prefix,
metadata_json = excluded.metadata_json,
updated_at = excluded.updated_at
""",
(
doc["document_id"],
doc["title"],
doc["source_kind"],
doc["workflow_mode"],
str(job.get("url") or ""),
job_id,
str(job.get("status") or "created"),
doc["storage_prefix"],
json.dumps(metadata, ensure_ascii=False),
created_at,
now,
),
)
existing_job = conn.execute("SELECT created_at FROM jobs WHERE id = ?", (job_id,)).fetchone()
job_created_at = float(existing_job["created_at"]) if existing_job else now
conn.execute(
"""
INSERT INTO jobs(
id, document_id, source_kind, workflow_mode, source_url, status,
progress, message, storage_path, state_path, video_url, duration,
width, height, frame_count, video_count, error, metadata_json,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
document_id = excluded.document_id,
source_kind = excluded.source_kind,
workflow_mode = excluded.workflow_mode,
source_url = excluded.source_url,
status = excluded.status,
progress = excluded.progress,
message = excluded.message,
storage_path = excluded.storage_path,
state_path = excluded.state_path,
video_url = excluded.video_url,
duration = excluded.duration,
width = excluded.width,
height = excluded.height,
frame_count = excluded.frame_count,
video_count = excluded.video_count,
error = excluded.error,
metadata_json = excluded.metadata_json,
updated_at = excluded.updated_at
""",
(
job_id,
doc["document_id"],
doc["source_kind"],
doc["workflow_mode"],
str(job.get("url") or ""),
str(job.get("status") or "created"),
int(job.get("progress") or 0),
str(job.get("message") or ""),
str(job_path),
str(state_path),
str(job.get("video_url") or ""),
float(job.get("duration") or 0),
int(job.get("width") or 0),
int(job.get("height") or 0),
len(frames),
len(generated_videos),
str(job.get("error") or ""),
json.dumps(metadata, ensure_ascii=False),
job_created_at,
now,
),
)
conn.execute("DELETE FROM media_assets WHERE job_id = ?", (job_id,))
for asset in self._job_assets(job, job_path, doc["document_id"]):
conn.execute(
"""
INSERT INTO media_assets(
id, document_id, job_id, kind, role, path, url, frame_index,
timestamp, width, height, duration, metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
asset["id"],
asset["document_id"],
asset["job_id"],
asset["kind"],
asset["role"],
asset.get("path", ""),
asset.get("url", ""),
asset.get("frame_index"),
asset.get("timestamp"),
int(asset.get("width") or 0),
int(asset.get("height") or 0),
float(asset.get("duration") or 0),
json.dumps(asset.get("metadata") or {}, ensure_ascii=False),
now,
now,
),
)
def _job_assets(self, job: dict[str, Any], job_path: Path, document_id: str) -> list[dict[str, Any]]:
job_id = str(job.get("id") or "")
items: list[dict[str, Any]] = []
def add(
asset_id: str,
kind: str,
role: str,
path: Path | str = "",
url: str = "",
frame_index: int | None = None,
timestamp: float | None = None,
width: int = 0,
height: int = 0,
duration: float = 0.0,
metadata: dict[str, Any] | None = None,
) -> None:
items.append({
"id": asset_id,
"document_id": document_id,
"job_id": job_id,
"kind": kind,
"role": role,
"path": str(path) if path else "",
"url": url,
"frame_index": frame_index,
"timestamp": timestamp,
"width": width,
"height": height,
"duration": duration,
"metadata": metadata or {},
})
if (job_path / "source.mp4").exists() or job.get("video_url"):
add(
f"{job_id}:source_video",
"video",
"source_video",
job_path / "source.mp4",
str(job.get("video_url") or f"/jobs/{job_id}/video.mp4"),
duration=float(job.get("duration") or 0),
width=int(job.get("width") or 0),
height=int(job.get("height") or 0),
)
if (job_path / "audio.wav").exists() or job.get("source_audio_url"):
add(
f"{job_id}:source_audio",
"audio",
"source_audio",
job_path / "audio.wav",
str(job.get("source_audio_url") or f"/jobs/{job_id}/audio.wav"),
duration=float(job.get("duration") or 0),
)
for frame in job.get("frames") or []:
idx = int(frame.get("index") or 0)
add(
f"{job_id}:frame:{idx}",
"image",
"keyframe",
job_path / "frames" / f"{idx:03d}.jpg",
str(frame.get("url") or f"/jobs/{job_id}/frames/{idx}.jpg"),
frame_index=idx,
timestamp=float(frame.get("timestamp") or 0),
metadata={"quality_report": frame.get("quality_report")},
)
if frame.get("cleaned_url"):
add(
f"{job_id}:frame:{idx}:cleaned",
"image",
"cleaned_keyframe",
job_path / "cleaned" / f"{idx:03d}.jpg",
str(frame.get("cleaned_url")),
frame_index=idx,
timestamp=float(frame.get("timestamp") or 0),
)
for generated in frame.get("generated_images") or []:
gen_id = str(generated.get("id") or "")
if gen_id:
add(
f"{job_id}:generated_image:{idx}:{gen_id}",
"image",
"generated_image",
job_path / "gen" / f"{idx:03d}_{gen_id}.jpg",
str(generated.get("url") or ""),
frame_index=idx,
metadata={"model": generated.get("model"), "mode": generated.get("mode")},
)
for scene_asset in frame.get("scene_assets") or []:
asset_id = str(scene_asset.get("id") or "")
if asset_id:
add(
f"{job_id}:scene_asset:{asset_id}",
"image",
str(scene_asset.get("asset_role") or "scene_asset"),
job_path / "assets" / f"{asset_id}.jpg",
str(scene_asset.get("url") or ""),
frame_index=idx,
width=int(scene_asset.get("width") or 0),
height=int(scene_asset.get("height") or 0),
metadata={"label": scene_asset.get("label"), "scene_mode": scene_asset.get("scene_mode")},
)
for element in frame.get("elements") or []:
element_id = str(element.get("id") or "")
cutout_ids = list(element.get("cutouts") or [])
legacy_cutout = element.get("cutout_id")
if legacy_cutout and legacy_cutout not in cutout_ids:
cutout_ids.append(legacy_cutout)
for cutout_id in cutout_ids:
add(
f"{job_id}:cutout:{idx}:{element_id}:{cutout_id}",
"image",
"element_cutout",
job_path / "elements" / f"{idx:03d}_{element_id}_{cutout_id}.jpg",
f"/jobs/{job_id}/frames/{idx}/elements/{element_id}/cutouts/{cutout_id}.jpg",
frame_index=idx,
metadata={"element_id": element_id, "name_zh": element.get("name_zh")},
)
for subject_asset in element.get("subject_assets") or []:
asset_id = str(subject_asset.get("id") or "")
if asset_id:
add(
f"{job_id}:subject_asset:{asset_id}",
"image",
"subject_asset",
job_path / "assets" / f"{asset_id}.jpg",
str(subject_asset.get("url") or ""),
frame_index=idx,
width=int(subject_asset.get("width") or 0),
height=int(subject_asset.get("height") or 0),
metadata={"view": subject_asset.get("view"), "label": subject_asset.get("label")},
)
for ref in job.get("product_refs") or []:
asset_id = str(ref.get("id") or ref.get("asset_id") or ref.get("url") or "")
if asset_id:
add(
f"{job_id}:product_ref:{asset_id}",
"image",
"product_ref",
self._path_from_job_url(job_path, job_id, str(ref.get("url") or "")),
str(ref.get("url") or ""),
metadata=ref,
)
for video in job.get("generated_videos") or []:
video_id = str(video.get("id") or "")
if video_id:
add(
f"{job_id}:generated_video:{video_id}",
"video",
"generated_video",
job_path / "videos" / f"{video_id}.mp4",
str(video.get("url") or ""),
frame_index=video.get("frame_idx"),
duration=float(video.get("duration") or 0),
metadata={"status": video.get("status"), "model": video.get("model"), "error": video.get("error")},
)
return items
def _path_from_job_url(self, job_path: Path, job_id: str, url: str) -> str:
prefix = f"/jobs/{job_id}/"
if not url.startswith(prefix):
return ""
tail = url[len(prefix):]
if tail == "video.mp4":
return str(job_path / "source.mp4")
return str(job_path / tail)
def delete_job(self, job_id: str) -> None:
if not self.enabled:
return
with self.connect() as conn:
row = conn.execute("SELECT document_id FROM jobs WHERE id = ?", (job_id,)).fetchone()
conn.execute("DELETE FROM jobs WHERE id = ?", (job_id,))
if row:
remaining = conn.execute(
"SELECT COUNT(*) AS c FROM jobs WHERE document_id = ?",
(row["document_id"],),
).fetchone()
if int(remaining["c"] or 0) == 0:
conn.execute("DELETE FROM documents WHERE id = ?", (row["document_id"],))
def list_documents(self, limit: int | None = None) -> list[dict[str, Any]]:
sql = """
SELECT
d.*,
COUNT(DISTINCT j.id) AS job_count,
COUNT(DISTINCT a.id) AS asset_count
FROM documents d
LEFT JOIN jobs j ON j.document_id = d.id
LEFT JOIN media_assets a ON a.document_id = d.id
GROUP BY d.id
ORDER BY d.updated_at DESC
"""
params: tuple[Any, ...] = ()
if limit is not None and limit > 0:
sql += " LIMIT ?"
params = (limit,)
with self.connect() as conn:
rows = conn.execute(sql, params).fetchall()
return [dict(row) for row in rows]
def health(self) -> dict[str, Any]:
if not self.enabled:
return {"enabled": False, "url": redact_database_url(self.url), "error": self.error}
try:
with self.connect() as conn:
docs = conn.execute("SELECT COUNT(*) AS c FROM documents").fetchone()["c"]
jobs = conn.execute("SELECT COUNT(*) AS c FROM jobs").fetchone()["c"]
assets = conn.execute("SELECT COUNT(*) AS c FROM media_assets").fetchone()["c"]
return {
"enabled": True,
"url": redact_database_url(self.url),
"schema_version": SCHEMA_VERSION,
"documents": int(docs or 0),
"jobs": int(jobs or 0),
"assets": int(assets or 0),
}
except Exception as e:
return {"enabled": False, "url": redact_database_url(self.url), "error": str(e)}
def create_database(url: str, jobs_dir: Path) -> AppDatabase:
db = AppDatabase(url, jobs_dir)
db.init()
return db