537 lines
23 KiB
Python
537 lines
23 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import sqlite3
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
SCHEMA_VERSION = 1
|
||
|
||
|
||
def default_database_url(jobs_dir: Path) -> str:
|
||
return os.getenv("APP_DB_URL") or os.getenv("DATABASE_URL") or f"sqlite:///{jobs_dir / 'app.db'}"
|
||
|
||
|
||
def redact_database_url(url: str) -> str:
|
||
if "://" not in url or "@" not in url:
|
||
return url
|
||
scheme, rest = url.split("://", 1)
|
||
_, host = rest.rsplit("@", 1)
|
||
return f"{scheme}://***@{host}"
|
||
|
||
|
||
def infer_source_kind(url: str) -> str:
|
||
if url.startswith("upload://"):
|
||
return "upload"
|
||
if url.startswith("http://") or url.startswith("https://"):
|
||
return "tiktok_link"
|
||
return "unknown"
|
||
|
||
|
||
def default_workflow_mode(source_kind: str) -> str:
|
||
if source_kind == "upload":
|
||
return "uploaded_reference"
|
||
return "feed_recreation"
|
||
|
||
|
||
def document_title(url: str, source_kind: str, fallback: str) -> str:
|
||
if source_kind == "upload":
|
||
return url.replace("upload://", "", 1).strip() or fallback
|
||
if url:
|
||
return url.strip()[:120]
|
||
return fallback
|
||
|
||
|
||
def storage_prefix(document_id: str, source_kind: str, workflow_mode: str) -> str:
|
||
source = source_kind or "unknown"
|
||
mode = workflow_mode or default_workflow_mode(source)
|
||
return f"{mode}/{source}/{document_id}"
|
||
|
||
|
||
class AppDatabase:
|
||
def __init__(self, url: str, jobs_dir: Path):
|
||
self.url = url
|
||
self.jobs_dir = jobs_dir
|
||
self.path = self._sqlite_path(url)
|
||
self.enabled = True
|
||
self.error = ""
|
||
|
||
@staticmethod
|
||
def _sqlite_path(url: str) -> Path:
|
||
if url == ":memory:":
|
||
return Path(":memory:")
|
||
if not url.startswith("sqlite:///"):
|
||
raise RuntimeError("当前内置数据库层只支持 sqlite:/// URL;Postgres 迁移会复用同一张表语义。")
|
||
raw = url[len("sqlite:///"):]
|
||
return Path(raw).expanduser().resolve()
|
||
|
||
def connect(self) -> sqlite3.Connection:
|
||
if str(self.path) != ":memory:":
|
||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||
conn = sqlite3.connect(str(self.path))
|
||
conn.row_factory = sqlite3.Row
|
||
conn.execute("PRAGMA foreign_keys = ON")
|
||
return conn
|
||
|
||
def init(self) -> None:
|
||
with self.connect() as conn:
|
||
conn.executescript(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS schema_meta (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT NOT NULL
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS documents (
|
||
id TEXT PRIMARY KEY,
|
||
title TEXT NOT NULL,
|
||
source_kind TEXT NOT NULL,
|
||
workflow_mode TEXT NOT NULL,
|
||
source_url TEXT NOT NULL DEFAULT '',
|
||
primary_job_id TEXT NOT NULL DEFAULT '',
|
||
status TEXT NOT NULL DEFAULT 'created',
|
||
storage_prefix TEXT NOT NULL,
|
||
metadata_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at REAL NOT NULL,
|
||
updated_at REAL NOT NULL
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS jobs (
|
||
id TEXT PRIMARY KEY,
|
||
document_id TEXT NOT NULL,
|
||
source_kind TEXT NOT NULL,
|
||
workflow_mode TEXT NOT NULL,
|
||
source_url TEXT NOT NULL DEFAULT '',
|
||
status TEXT NOT NULL,
|
||
progress INTEGER NOT NULL DEFAULT 0,
|
||
message TEXT NOT NULL DEFAULT '',
|
||
storage_path TEXT NOT NULL,
|
||
state_path TEXT NOT NULL,
|
||
video_url TEXT NOT NULL DEFAULT '',
|
||
duration REAL NOT NULL DEFAULT 0,
|
||
width INTEGER NOT NULL DEFAULT 0,
|
||
height INTEGER NOT NULL DEFAULT 0,
|
||
frame_count INTEGER NOT NULL DEFAULT 0,
|
||
video_count INTEGER NOT NULL DEFAULT 0,
|
||
error TEXT NOT NULL DEFAULT '',
|
||
metadata_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at REAL NOT NULL,
|
||
updated_at REAL NOT NULL,
|
||
FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS media_assets (
|
||
id TEXT PRIMARY KEY,
|
||
document_id TEXT NOT NULL,
|
||
job_id TEXT NOT NULL,
|
||
kind TEXT NOT NULL,
|
||
role TEXT NOT NULL,
|
||
path TEXT NOT NULL DEFAULT '',
|
||
url TEXT NOT NULL DEFAULT '',
|
||
frame_index INTEGER,
|
||
timestamp REAL,
|
||
width INTEGER NOT NULL DEFAULT 0,
|
||
height INTEGER NOT NULL DEFAULT 0,
|
||
duration REAL NOT NULL DEFAULT 0,
|
||
metadata_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at REAL NOT NULL,
|
||
updated_at REAL NOT NULL,
|
||
FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_documents_updated_at ON documents(updated_at DESC);
|
||
CREATE INDEX IF NOT EXISTS idx_documents_source_kind ON documents(source_kind);
|
||
CREATE INDEX IF NOT EXISTS idx_documents_workflow_mode ON documents(workflow_mode);
|
||
CREATE INDEX IF NOT EXISTS idx_jobs_document_id ON jobs(document_id);
|
||
CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
|
||
CREATE INDEX IF NOT EXISTS idx_assets_document_id ON media_assets(document_id);
|
||
CREATE INDEX IF NOT EXISTS idx_assets_job_id ON media_assets(job_id);
|
||
CREATE INDEX IF NOT EXISTS idx_assets_role ON media_assets(role);
|
||
"""
|
||
)
|
||
conn.execute(
|
||
"INSERT OR REPLACE INTO schema_meta(key, value) VALUES('schema_version', ?)",
|
||
(str(SCHEMA_VERSION),),
|
||
)
|
||
|
||
def normalize_job_document(self, job: dict[str, Any]) -> dict[str, Any]:
|
||
job_id = str(job.get("id") or "")
|
||
source_url = str(job.get("url") or "")
|
||
source_kind = str(job.get("source_kind") or "") or infer_source_kind(source_url)
|
||
workflow_mode = str(job.get("workflow_mode") or "") or default_workflow_mode(source_kind)
|
||
document_id = str(job.get("document_id") or "") or job_id
|
||
prefix = str(job.get("storage_prefix") or "") or storage_prefix(document_id, source_kind, workflow_mode)
|
||
return {
|
||
"document_id": document_id,
|
||
"source_kind": source_kind,
|
||
"workflow_mode": workflow_mode,
|
||
"storage_prefix": prefix,
|
||
"title": document_title(source_url, source_kind, document_id),
|
||
}
|
||
|
||
def sync_job(self, job: dict[str, Any], job_path: Path) -> None:
|
||
if not self.enabled:
|
||
return
|
||
now = time.time()
|
||
job_id = str(job.get("id") or "")
|
||
if not job_id:
|
||
return
|
||
doc = self.normalize_job_document(job)
|
||
state_path = job_path / "state.json"
|
||
frames = list(job.get("frames") or [])
|
||
generated_videos = list(job.get("generated_videos") or [])
|
||
metadata = {
|
||
"audio_segment_count": len(job.get("transcript") or []),
|
||
"product_ref_count": len(job.get("product_refs") or []),
|
||
"storyboard_image_count": len(job.get("storyboard_images") or []),
|
||
}
|
||
with self.connect() as conn:
|
||
existing = conn.execute(
|
||
"SELECT created_at FROM documents WHERE id = ?",
|
||
(doc["document_id"],),
|
||
).fetchone()
|
||
created_at = float(existing["created_at"]) if existing else now
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO documents(
|
||
id, title, source_kind, workflow_mode, source_url, primary_job_id,
|
||
status, storage_prefix, metadata_json, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
ON CONFLICT(id) DO UPDATE SET
|
||
title = excluded.title,
|
||
source_kind = excluded.source_kind,
|
||
workflow_mode = excluded.workflow_mode,
|
||
source_url = excluded.source_url,
|
||
primary_job_id = excluded.primary_job_id,
|
||
status = excluded.status,
|
||
storage_prefix = excluded.storage_prefix,
|
||
metadata_json = excluded.metadata_json,
|
||
updated_at = excluded.updated_at
|
||
""",
|
||
(
|
||
doc["document_id"],
|
||
doc["title"],
|
||
doc["source_kind"],
|
||
doc["workflow_mode"],
|
||
str(job.get("url") or ""),
|
||
job_id,
|
||
str(job.get("status") or "created"),
|
||
doc["storage_prefix"],
|
||
json.dumps(metadata, ensure_ascii=False),
|
||
created_at,
|
||
now,
|
||
),
|
||
)
|
||
existing_job = conn.execute("SELECT created_at FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
||
job_created_at = float(existing_job["created_at"]) if existing_job else now
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO jobs(
|
||
id, document_id, source_kind, workflow_mode, source_url, status,
|
||
progress, message, storage_path, state_path, video_url, duration,
|
||
width, height, frame_count, video_count, error, metadata_json,
|
||
created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
ON CONFLICT(id) DO UPDATE SET
|
||
document_id = excluded.document_id,
|
||
source_kind = excluded.source_kind,
|
||
workflow_mode = excluded.workflow_mode,
|
||
source_url = excluded.source_url,
|
||
status = excluded.status,
|
||
progress = excluded.progress,
|
||
message = excluded.message,
|
||
storage_path = excluded.storage_path,
|
||
state_path = excluded.state_path,
|
||
video_url = excluded.video_url,
|
||
duration = excluded.duration,
|
||
width = excluded.width,
|
||
height = excluded.height,
|
||
frame_count = excluded.frame_count,
|
||
video_count = excluded.video_count,
|
||
error = excluded.error,
|
||
metadata_json = excluded.metadata_json,
|
||
updated_at = excluded.updated_at
|
||
""",
|
||
(
|
||
job_id,
|
||
doc["document_id"],
|
||
doc["source_kind"],
|
||
doc["workflow_mode"],
|
||
str(job.get("url") or ""),
|
||
str(job.get("status") or "created"),
|
||
int(job.get("progress") or 0),
|
||
str(job.get("message") or ""),
|
||
str(job_path),
|
||
str(state_path),
|
||
str(job.get("video_url") or ""),
|
||
float(job.get("duration") or 0),
|
||
int(job.get("width") or 0),
|
||
int(job.get("height") or 0),
|
||
len(frames),
|
||
len(generated_videos),
|
||
str(job.get("error") or ""),
|
||
json.dumps(metadata, ensure_ascii=False),
|
||
job_created_at,
|
||
now,
|
||
),
|
||
)
|
||
conn.execute("DELETE FROM media_assets WHERE job_id = ?", (job_id,))
|
||
for asset in self._job_assets(job, job_path, doc["document_id"]):
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO media_assets(
|
||
id, document_id, job_id, kind, role, path, url, frame_index,
|
||
timestamp, width, height, duration, metadata_json, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
asset["id"],
|
||
asset["document_id"],
|
||
asset["job_id"],
|
||
asset["kind"],
|
||
asset["role"],
|
||
asset.get("path", ""),
|
||
asset.get("url", ""),
|
||
asset.get("frame_index"),
|
||
asset.get("timestamp"),
|
||
int(asset.get("width") or 0),
|
||
int(asset.get("height") or 0),
|
||
float(asset.get("duration") or 0),
|
||
json.dumps(asset.get("metadata") or {}, ensure_ascii=False),
|
||
now,
|
||
now,
|
||
),
|
||
)
|
||
|
||
def _job_assets(self, job: dict[str, Any], job_path: Path, document_id: str) -> list[dict[str, Any]]:
|
||
job_id = str(job.get("id") or "")
|
||
items: list[dict[str, Any]] = []
|
||
|
||
def add(
|
||
asset_id: str,
|
||
kind: str,
|
||
role: str,
|
||
path: Path | str = "",
|
||
url: str = "",
|
||
frame_index: int | None = None,
|
||
timestamp: float | None = None,
|
||
width: int = 0,
|
||
height: int = 0,
|
||
duration: float = 0.0,
|
||
metadata: dict[str, Any] | None = None,
|
||
) -> None:
|
||
items.append({
|
||
"id": asset_id,
|
||
"document_id": document_id,
|
||
"job_id": job_id,
|
||
"kind": kind,
|
||
"role": role,
|
||
"path": str(path) if path else "",
|
||
"url": url,
|
||
"frame_index": frame_index,
|
||
"timestamp": timestamp,
|
||
"width": width,
|
||
"height": height,
|
||
"duration": duration,
|
||
"metadata": metadata or {},
|
||
})
|
||
|
||
if (job_path / "source.mp4").exists() or job.get("video_url"):
|
||
add(
|
||
f"{job_id}:source_video",
|
||
"video",
|
||
"source_video",
|
||
job_path / "source.mp4",
|
||
str(job.get("video_url") or f"/jobs/{job_id}/video.mp4"),
|
||
duration=float(job.get("duration") or 0),
|
||
width=int(job.get("width") or 0),
|
||
height=int(job.get("height") or 0),
|
||
)
|
||
if (job_path / "audio.wav").exists() or job.get("source_audio_url"):
|
||
add(
|
||
f"{job_id}:source_audio",
|
||
"audio",
|
||
"source_audio",
|
||
job_path / "audio.wav",
|
||
str(job.get("source_audio_url") or f"/jobs/{job_id}/audio.wav"),
|
||
duration=float(job.get("duration") or 0),
|
||
)
|
||
|
||
for frame in job.get("frames") or []:
|
||
idx = int(frame.get("index") or 0)
|
||
add(
|
||
f"{job_id}:frame:{idx}",
|
||
"image",
|
||
"keyframe",
|
||
job_path / "frames" / f"{idx:03d}.jpg",
|
||
str(frame.get("url") or f"/jobs/{job_id}/frames/{idx}.jpg"),
|
||
frame_index=idx,
|
||
timestamp=float(frame.get("timestamp") or 0),
|
||
metadata={"quality_report": frame.get("quality_report")},
|
||
)
|
||
if frame.get("cleaned_url"):
|
||
add(
|
||
f"{job_id}:frame:{idx}:cleaned",
|
||
"image",
|
||
"cleaned_keyframe",
|
||
job_path / "cleaned" / f"{idx:03d}.jpg",
|
||
str(frame.get("cleaned_url")),
|
||
frame_index=idx,
|
||
timestamp=float(frame.get("timestamp") or 0),
|
||
)
|
||
for generated in frame.get("generated_images") or []:
|
||
gen_id = str(generated.get("id") or "")
|
||
if gen_id:
|
||
add(
|
||
f"{job_id}:generated_image:{idx}:{gen_id}",
|
||
"image",
|
||
"generated_image",
|
||
job_path / "gen" / f"{idx:03d}_{gen_id}.jpg",
|
||
str(generated.get("url") or ""),
|
||
frame_index=idx,
|
||
metadata={"model": generated.get("model"), "mode": generated.get("mode")},
|
||
)
|
||
for scene_asset in frame.get("scene_assets") or []:
|
||
asset_id = str(scene_asset.get("id") or "")
|
||
if asset_id:
|
||
add(
|
||
f"{job_id}:scene_asset:{asset_id}",
|
||
"image",
|
||
str(scene_asset.get("asset_role") or "scene_asset"),
|
||
job_path / "assets" / f"{asset_id}.jpg",
|
||
str(scene_asset.get("url") or ""),
|
||
frame_index=idx,
|
||
width=int(scene_asset.get("width") or 0),
|
||
height=int(scene_asset.get("height") or 0),
|
||
metadata={"label": scene_asset.get("label"), "scene_mode": scene_asset.get("scene_mode")},
|
||
)
|
||
for element in frame.get("elements") or []:
|
||
element_id = str(element.get("id") or "")
|
||
cutout_ids = list(element.get("cutouts") or [])
|
||
legacy_cutout = element.get("cutout_id")
|
||
if legacy_cutout and legacy_cutout not in cutout_ids:
|
||
cutout_ids.append(legacy_cutout)
|
||
for cutout_id in cutout_ids:
|
||
add(
|
||
f"{job_id}:cutout:{idx}:{element_id}:{cutout_id}",
|
||
"image",
|
||
"element_cutout",
|
||
job_path / "elements" / f"{idx:03d}_{element_id}_{cutout_id}.jpg",
|
||
f"/jobs/{job_id}/frames/{idx}/elements/{element_id}/cutouts/{cutout_id}.jpg",
|
||
frame_index=idx,
|
||
metadata={"element_id": element_id, "name_zh": element.get("name_zh")},
|
||
)
|
||
for subject_asset in element.get("subject_assets") or []:
|
||
asset_id = str(subject_asset.get("id") or "")
|
||
if asset_id:
|
||
add(
|
||
f"{job_id}:subject_asset:{asset_id}",
|
||
"image",
|
||
"subject_asset",
|
||
job_path / "assets" / f"{asset_id}.jpg",
|
||
str(subject_asset.get("url") or ""),
|
||
frame_index=idx,
|
||
width=int(subject_asset.get("width") or 0),
|
||
height=int(subject_asset.get("height") or 0),
|
||
metadata={"view": subject_asset.get("view"), "label": subject_asset.get("label")},
|
||
)
|
||
|
||
for ref in job.get("product_refs") or []:
|
||
asset_id = str(ref.get("id") or ref.get("asset_id") or ref.get("url") or "")
|
||
if asset_id:
|
||
add(
|
||
f"{job_id}:product_ref:{asset_id}",
|
||
"image",
|
||
"product_ref",
|
||
self._path_from_job_url(job_path, job_id, str(ref.get("url") or "")),
|
||
str(ref.get("url") or ""),
|
||
metadata=ref,
|
||
)
|
||
|
||
for video in job.get("generated_videos") or []:
|
||
video_id = str(video.get("id") or "")
|
||
if video_id:
|
||
add(
|
||
f"{job_id}:generated_video:{video_id}",
|
||
"video",
|
||
"generated_video",
|
||
job_path / "videos" / f"{video_id}.mp4",
|
||
str(video.get("url") or ""),
|
||
frame_index=video.get("frame_idx"),
|
||
duration=float(video.get("duration") or 0),
|
||
metadata={"status": video.get("status"), "model": video.get("model"), "error": video.get("error")},
|
||
)
|
||
return items
|
||
|
||
def _path_from_job_url(self, job_path: Path, job_id: str, url: str) -> str:
|
||
prefix = f"/jobs/{job_id}/"
|
||
if not url.startswith(prefix):
|
||
return ""
|
||
tail = url[len(prefix):]
|
||
if tail == "video.mp4":
|
||
return str(job_path / "source.mp4")
|
||
return str(job_path / tail)
|
||
|
||
def delete_job(self, job_id: str) -> None:
|
||
if not self.enabled:
|
||
return
|
||
with self.connect() as conn:
|
||
row = conn.execute("SELECT document_id FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
||
conn.execute("DELETE FROM jobs WHERE id = ?", (job_id,))
|
||
if row:
|
||
remaining = conn.execute(
|
||
"SELECT COUNT(*) AS c FROM jobs WHERE document_id = ?",
|
||
(row["document_id"],),
|
||
).fetchone()
|
||
if int(remaining["c"] or 0) == 0:
|
||
conn.execute("DELETE FROM documents WHERE id = ?", (row["document_id"],))
|
||
|
||
def list_documents(self, limit: int | None = None) -> list[dict[str, Any]]:
|
||
sql = """
|
||
SELECT
|
||
d.*,
|
||
COUNT(DISTINCT j.id) AS job_count,
|
||
COUNT(DISTINCT a.id) AS asset_count
|
||
FROM documents d
|
||
LEFT JOIN jobs j ON j.document_id = d.id
|
||
LEFT JOIN media_assets a ON a.document_id = d.id
|
||
GROUP BY d.id
|
||
ORDER BY d.updated_at DESC
|
||
"""
|
||
params: tuple[Any, ...] = ()
|
||
if limit is not None and limit > 0:
|
||
sql += " LIMIT ?"
|
||
params = (limit,)
|
||
with self.connect() as conn:
|
||
rows = conn.execute(sql, params).fetchall()
|
||
return [dict(row) for row in rows]
|
||
|
||
def health(self) -> dict[str, Any]:
|
||
if not self.enabled:
|
||
return {"enabled": False, "url": redact_database_url(self.url), "error": self.error}
|
||
try:
|
||
with self.connect() as conn:
|
||
docs = conn.execute("SELECT COUNT(*) AS c FROM documents").fetchone()["c"]
|
||
jobs = conn.execute("SELECT COUNT(*) AS c FROM jobs").fetchone()["c"]
|
||
assets = conn.execute("SELECT COUNT(*) AS c FROM media_assets").fetchone()["c"]
|
||
return {
|
||
"enabled": True,
|
||
"url": redact_database_url(self.url),
|
||
"schema_version": SCHEMA_VERSION,
|
||
"documents": int(docs or 0),
|
||
"jobs": int(jobs or 0),
|
||
"assets": int(assets or 0),
|
||
}
|
||
except Exception as e:
|
||
return {"enabled": False, "url": redact_database_url(self.url), "error": str(e)}
|
||
|
||
|
||
def create_database(url: str, jobs_dir: Path) -> AppDatabase:
|
||
db = AppDatabase(url, jobs_dir)
|
||
db.init()
|
||
return db
|