diff --git a/.memory/status.md b/.memory/status.md index 24404c0..26538c0 100644 --- a/.memory/status.md +++ b/.memory/status.md @@ -43,6 +43,7 @@ ## 关键文件 - `api/main.py` — FastAPI 后端、模型路由、任务状态、ASR/翻译/音频分析、生图、产品识别、首尾帧和视频接口。 +- `api/database.py` — 后端数据库层;当前用 SQLite 保存 document / job / media asset 元数据,媒体文件仍在 `jobs//`。 - `api/.env.example` — 本地模型和网关模板;已包含 `GPT_TEXT_MODEL=gpt-4o`。 - `deploy/.env.production.example` — 生产环境模板;视频默认 SKG Doubao / Seedance 网关。 - `RULES.md` — 启动、部署事实、模型环境变量和项目规则。 @@ -54,6 +55,7 @@ ## 主要 API ``` GET /health +GET /documents POST /jobs POST /jobs/upload GET /jobs @@ -89,10 +91,12 @@ POST /jobs/{id}/frames/{idx}/storyboard/video 6. 当前主流程不直接批量提交视频;先走“分镜规划 → 首尾帧 → 人工审核”。 7. 产品素材池默认是“同一产品”,不做不同产品身份判断;视角识别必须按佩戴者左 / 右、上 / 下、内 / 外侧描述。 8. 自动抽帧默认是 `frames=6` + `target=random_subject` + `quality=accurate` + `mode=replace`;如果需要特定动作或表情,用“当前点抽帧”手动补。 -8. 后端长任务不要用 `--reload`。 -9. 关键帧 `index` 是稳定 ID,不等于数组下标;前端取帧用 `frames.find(x => x.index === idx)`。 +9. 文档是顶层业务归类:每个 TK 链接或上传视频默认一个 `document`,`job` 归属到 `document_id`;DB 存元数据和文件索引,视频 / 图片 / 音频文件不进 DB。 +10. 后端长任务不要用 `--reload`。 +11. 关键帧 `index` 是稳定 ID,不等于数组下标;前端取帧用 `frames.find(x => x.index === idx)`。 ## 最近变更 +- 2026-05-18:新增后端数据库层,SQLite 默认落在 `APP_DB_URL` / `DATABASE_URL` 或 `JOBS_DIR/app.db`;`/documents` 返回文档归类列表,`/health.database` 返回 DB 状态。 - 2026-05-18:`VISION_MODEL`、`REWRITE_MODEL`、`AUDIO_REWRITE_MODEL` 切到 GPT 默认模型 `gpt-4o`,并加旧 Gemini 环境变量归一化保护。 - 2026-05-18:语音通道固定 Azure OpenAI TTS,移除 MiniMax fallback,并按 `AZURE_TTS_PATHS` 尝试语音路径。 - 2026-05-18:当前主路径暂停直接提交视频,改为逐条首尾帧闸门。 diff --git a/RULES.md b/RULES.md index 3cbed56..3b5218e 100644 --- a/RULES.md +++ b/RULES.md @@ -24,7 +24,7 @@ - 服务器目录:`/opt/skg-marketing-studio` - 生产启动:`docker compose -f docker-compose.prod.yml --env-file deploy/.env.production up -d --build` - 生产架构:`web` 容器用 Nginx 承载 Next 静态导出;`/login/`、`/_next/`、`/assets/`、`/skg-logo-black.svg`、`/oasis-source/` 等登录页必需静态资源公开访问;未登录访问工作台跳转 `/login/`,`/api/` 通过 Nginx `auth_request` 校验 FastAPI 会话 Cookie 后反代到 `skg-marketing-api:4291`;Traefik 通过 `coolify` 外部网络接入 80/443 -- 持久化目录:服务器 `./data/jobs` 挂载到后端 `/data/jobs` +- 持久化目录:服务器 `./data/jobs` 挂载到后端 `/data/jobs`;默认后端数据库为 `APP_DB_URL=sqlite:////data/jobs/app.db`,只存文档 / job / 媒体资产元数据和文件索引,原视频、音频、抽帧、生图、视频候选仍放在 `/data/jobs//` - 登录凭证:用户名写下方快捷登录;密码明文备份只放服务器 `/root/skg-marketing-studio-login.txt`,生产环境变量 `WEB_AUTH_PASSWORD` / `WEB_AUTH_SESSION_SECRET` 只放服务器 `deploy/.env.production` ## 快捷登录 @@ -70,6 +70,7 @@ - `AZURE_TTS_MODEL` / `AZURE_TTS_VOICE_ID` / `AZURE_TTS_VOICE_POOL` / `AZURE_TTS_PATH` / `AZURE_TTS_PATHS`:Azure OpenAI TTS 模型、默认音色、音色池和 OpenAI 协议语音路径;后端会按 `AZURE_TTS_PATHS` 依次尝试,便于区分路径不对和整条语音服务不可用 - MiniMax TTS 不再作为语音 fallback;不要新增或依赖 `MINIMAX_*` 配置 - `POE_API_KEY` / `VIDEO_API_KEY`:视频生成通道 Key,只能放本地环境变量 +- `APP_DB_URL` / `DATABASE_URL`:后端元数据数据库;当前内置实现支持 `sqlite:///`,生产默认 `sqlite:////data/jobs/app.db`。文档归类以 `documents` 为顶层,一条 TK 链接或一次上传默认一个 document,`jobs` 和 `media_assets` 归属到 `document_id`。 - `WEB_AUTH_USERNAME` / `WEB_AUTH_PASSWORD` / `WEB_AUTH_SESSION_SECRET`:生产网页登录和会话签名配置;密码和 session secret 只放服务器环境变量,不入库 - `FFMPEG_BIN` / `FFPROBE_BIN`:可选本地媒体二进制路径;本机 Homebrew ffmpeg 动态库损坏时,后端会自动跳过不可用的 PATH 版本并尝试本机静态 ffmpeg 备选,生产仍建议使用系统 ffmpeg/ffprobe - 生产环境变量:服务器只使用 `deploy/.env.production`,模板为 `deploy/.env.production.example`;真实 Key 不入库 diff --git a/api/.env.example b/api/.env.example index e75aef9..7b0990e 100644 --- a/api/.env.example +++ b/api/.env.example @@ -77,6 +77,7 @@ VIDEO_DURATION_FIELD=seconds VIDEO_POLL_TIMEOUT_SECONDS=900 # 工作目录 +APP_DB_URL=sqlite:///./jobs/app.db KEYFRAME_COUNT=6 JOBS_DIR=./jobs diff --git a/api/database.py b/api/database.py new file mode 100644 index 0000000..8a49601 --- /dev/null +++ b/api/database.py @@ -0,0 +1,536 @@ +from __future__ import annotations + +import json +import os +import sqlite3 +import time +from pathlib import Path +from typing import Any + + +SCHEMA_VERSION = 1 + + +def default_database_url(jobs_dir: Path) -> str: + return os.getenv("APP_DB_URL") or os.getenv("DATABASE_URL") or f"sqlite:///{jobs_dir / 'app.db'}" + + +def redact_database_url(url: str) -> str: + if "://" not in url or "@" not in url: + return url + scheme, rest = url.split("://", 1) + _, host = rest.rsplit("@", 1) + return f"{scheme}://***@{host}" + + +def infer_source_kind(url: str) -> str: + if url.startswith("upload://"): + return "upload" + if url.startswith("http://") or url.startswith("https://"): + return "tiktok_link" + return "unknown" + + +def default_workflow_mode(source_kind: str) -> str: + if source_kind == "upload": + return "uploaded_reference" + return "feed_recreation" + + +def document_title(url: str, source_kind: str, fallback: str) -> str: + if source_kind == "upload": + return url.replace("upload://", "", 1).strip() or fallback + if url: + return url.strip()[:120] + return fallback + + +def storage_prefix(document_id: str, source_kind: str, workflow_mode: str) -> str: + source = source_kind or "unknown" + mode = workflow_mode or default_workflow_mode(source) + return f"{mode}/{source}/{document_id}" + + +class AppDatabase: + def __init__(self, url: str, jobs_dir: Path): + self.url = url + self.jobs_dir = jobs_dir + self.path = self._sqlite_path(url) + self.enabled = True + self.error = "" + + @staticmethod + def _sqlite_path(url: str) -> Path: + if url == ":memory:": + return Path(":memory:") + if not url.startswith("sqlite:///"): + raise RuntimeError("当前内置数据库层只支持 sqlite:/// URL;Postgres 迁移会复用同一张表语义。") + raw = url[len("sqlite:///"):] + return Path(raw).expanduser().resolve() + + def connect(self) -> sqlite3.Connection: + if str(self.path) != ":memory:": + self.path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(self.path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA foreign_keys = ON") + return conn + + def init(self) -> None: + with self.connect() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS schema_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS documents ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + source_kind TEXT NOT NULL, + workflow_mode TEXT NOT NULL, + source_url TEXT NOT NULL DEFAULT '', + primary_job_id TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'created', + storage_prefix TEXT NOT NULL, + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at REAL NOT NULL, + updated_at REAL NOT NULL + ); + + CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + document_id TEXT NOT NULL, + source_kind TEXT NOT NULL, + workflow_mode TEXT NOT NULL, + source_url TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL, + progress INTEGER NOT NULL DEFAULT 0, + message TEXT NOT NULL DEFAULT '', + storage_path TEXT NOT NULL, + state_path TEXT NOT NULL, + video_url TEXT NOT NULL DEFAULT '', + duration REAL NOT NULL DEFAULT 0, + width INTEGER NOT NULL DEFAULT 0, + height INTEGER NOT NULL DEFAULT 0, + frame_count INTEGER NOT NULL DEFAULT 0, + video_count INTEGER NOT NULL DEFAULT 0, + error TEXT NOT NULL DEFAULT '', + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at REAL NOT NULL, + updated_at REAL NOT NULL, + FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS media_assets ( + id TEXT PRIMARY KEY, + document_id TEXT NOT NULL, + job_id TEXT NOT NULL, + kind TEXT NOT NULL, + role TEXT NOT NULL, + path TEXT NOT NULL DEFAULT '', + url TEXT NOT NULL DEFAULT '', + frame_index INTEGER, + timestamp REAL, + width INTEGER NOT NULL DEFAULT 0, + height INTEGER NOT NULL DEFAULT 0, + duration REAL NOT NULL DEFAULT 0, + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at REAL NOT NULL, + updated_at REAL NOT NULL, + FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE, + FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_documents_updated_at ON documents(updated_at DESC); + CREATE INDEX IF NOT EXISTS idx_documents_source_kind ON documents(source_kind); + CREATE INDEX IF NOT EXISTS idx_documents_workflow_mode ON documents(workflow_mode); + CREATE INDEX IF NOT EXISTS idx_jobs_document_id ON jobs(document_id); + CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC); + CREATE INDEX IF NOT EXISTS idx_assets_document_id ON media_assets(document_id); + CREATE INDEX IF NOT EXISTS idx_assets_job_id ON media_assets(job_id); + CREATE INDEX IF NOT EXISTS idx_assets_role ON media_assets(role); + """ + ) + conn.execute( + "INSERT OR REPLACE INTO schema_meta(key, value) VALUES('schema_version', ?)", + (str(SCHEMA_VERSION),), + ) + + def normalize_job_document(self, job: dict[str, Any]) -> dict[str, Any]: + job_id = str(job.get("id") or "") + source_url = str(job.get("url") or "") + source_kind = str(job.get("source_kind") or "") or infer_source_kind(source_url) + workflow_mode = str(job.get("workflow_mode") or "") or default_workflow_mode(source_kind) + document_id = str(job.get("document_id") or "") or job_id + prefix = str(job.get("storage_prefix") or "") or storage_prefix(document_id, source_kind, workflow_mode) + return { + "document_id": document_id, + "source_kind": source_kind, + "workflow_mode": workflow_mode, + "storage_prefix": prefix, + "title": document_title(source_url, source_kind, document_id), + } + + def sync_job(self, job: dict[str, Any], job_path: Path) -> None: + if not self.enabled: + return + now = time.time() + job_id = str(job.get("id") or "") + if not job_id: + return + doc = self.normalize_job_document(job) + state_path = job_path / "state.json" + frames = list(job.get("frames") or []) + generated_videos = list(job.get("generated_videos") or []) + metadata = { + "audio_segment_count": len(job.get("transcript") or []), + "product_ref_count": len(job.get("product_refs") or []), + "storyboard_image_count": len(job.get("storyboard_images") or []), + } + with self.connect() as conn: + existing = conn.execute( + "SELECT created_at FROM documents WHERE id = ?", + (doc["document_id"],), + ).fetchone() + created_at = float(existing["created_at"]) if existing else now + conn.execute( + """ + INSERT INTO documents( + id, title, source_kind, workflow_mode, source_url, primary_job_id, + status, storage_prefix, metadata_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + title = excluded.title, + source_kind = excluded.source_kind, + workflow_mode = excluded.workflow_mode, + source_url = excluded.source_url, + primary_job_id = excluded.primary_job_id, + status = excluded.status, + storage_prefix = excluded.storage_prefix, + metadata_json = excluded.metadata_json, + updated_at = excluded.updated_at + """, + ( + doc["document_id"], + doc["title"], + doc["source_kind"], + doc["workflow_mode"], + str(job.get("url") or ""), + job_id, + str(job.get("status") or "created"), + doc["storage_prefix"], + json.dumps(metadata, ensure_ascii=False), + created_at, + now, + ), + ) + existing_job = conn.execute("SELECT created_at FROM jobs WHERE id = ?", (job_id,)).fetchone() + job_created_at = float(existing_job["created_at"]) if existing_job else now + conn.execute( + """ + INSERT INTO jobs( + id, document_id, source_kind, workflow_mode, source_url, status, + progress, message, storage_path, state_path, video_url, duration, + width, height, frame_count, video_count, error, metadata_json, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + document_id = excluded.document_id, + source_kind = excluded.source_kind, + workflow_mode = excluded.workflow_mode, + source_url = excluded.source_url, + status = excluded.status, + progress = excluded.progress, + message = excluded.message, + storage_path = excluded.storage_path, + state_path = excluded.state_path, + video_url = excluded.video_url, + duration = excluded.duration, + width = excluded.width, + height = excluded.height, + frame_count = excluded.frame_count, + video_count = excluded.video_count, + error = excluded.error, + metadata_json = excluded.metadata_json, + updated_at = excluded.updated_at + """, + ( + job_id, + doc["document_id"], + doc["source_kind"], + doc["workflow_mode"], + str(job.get("url") or ""), + str(job.get("status") or "created"), + int(job.get("progress") or 0), + str(job.get("message") or ""), + str(job_path), + str(state_path), + str(job.get("video_url") or ""), + float(job.get("duration") or 0), + int(job.get("width") or 0), + int(job.get("height") or 0), + len(frames), + len(generated_videos), + str(job.get("error") or ""), + json.dumps(metadata, ensure_ascii=False), + job_created_at, + now, + ), + ) + conn.execute("DELETE FROM media_assets WHERE job_id = ?", (job_id,)) + for asset in self._job_assets(job, job_path, doc["document_id"]): + conn.execute( + """ + INSERT INTO media_assets( + id, document_id, job_id, kind, role, path, url, frame_index, + timestamp, width, height, duration, metadata_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + asset["id"], + asset["document_id"], + asset["job_id"], + asset["kind"], + asset["role"], + asset.get("path", ""), + asset.get("url", ""), + asset.get("frame_index"), + asset.get("timestamp"), + int(asset.get("width") or 0), + int(asset.get("height") or 0), + float(asset.get("duration") or 0), + json.dumps(asset.get("metadata") or {}, ensure_ascii=False), + now, + now, + ), + ) + + def _job_assets(self, job: dict[str, Any], job_path: Path, document_id: str) -> list[dict[str, Any]]: + job_id = str(job.get("id") or "") + items: list[dict[str, Any]] = [] + + def add( + asset_id: str, + kind: str, + role: str, + path: Path | str = "", + url: str = "", + frame_index: int | None = None, + timestamp: float | None = None, + width: int = 0, + height: int = 0, + duration: float = 0.0, + metadata: dict[str, Any] | None = None, + ) -> None: + items.append({ + "id": asset_id, + "document_id": document_id, + "job_id": job_id, + "kind": kind, + "role": role, + "path": str(path) if path else "", + "url": url, + "frame_index": frame_index, + "timestamp": timestamp, + "width": width, + "height": height, + "duration": duration, + "metadata": metadata or {}, + }) + + if (job_path / "source.mp4").exists() or job.get("video_url"): + add( + f"{job_id}:source_video", + "video", + "source_video", + job_path / "source.mp4", + str(job.get("video_url") or f"/jobs/{job_id}/video.mp4"), + duration=float(job.get("duration") or 0), + width=int(job.get("width") or 0), + height=int(job.get("height") or 0), + ) + if (job_path / "audio.wav").exists() or job.get("source_audio_url"): + add( + f"{job_id}:source_audio", + "audio", + "source_audio", + job_path / "audio.wav", + str(job.get("source_audio_url") or f"/jobs/{job_id}/audio.wav"), + duration=float(job.get("duration") or 0), + ) + + for frame in job.get("frames") or []: + idx = int(frame.get("index") or 0) + add( + f"{job_id}:frame:{idx}", + "image", + "keyframe", + job_path / "frames" / f"{idx:03d}.jpg", + str(frame.get("url") or f"/jobs/{job_id}/frames/{idx}.jpg"), + frame_index=idx, + timestamp=float(frame.get("timestamp") or 0), + metadata={"quality_report": frame.get("quality_report")}, + ) + if frame.get("cleaned_url"): + add( + f"{job_id}:frame:{idx}:cleaned", + "image", + "cleaned_keyframe", + job_path / "cleaned" / f"{idx:03d}.jpg", + str(frame.get("cleaned_url")), + frame_index=idx, + timestamp=float(frame.get("timestamp") or 0), + ) + for generated in frame.get("generated_images") or []: + gen_id = str(generated.get("id") or "") + if gen_id: + add( + f"{job_id}:generated_image:{idx}:{gen_id}", + "image", + "generated_image", + job_path / "gen" / f"{idx:03d}_{gen_id}.jpg", + str(generated.get("url") or ""), + frame_index=idx, + metadata={"model": generated.get("model"), "mode": generated.get("mode")}, + ) + for scene_asset in frame.get("scene_assets") or []: + asset_id = str(scene_asset.get("id") or "") + if asset_id: + add( + f"{job_id}:scene_asset:{asset_id}", + "image", + str(scene_asset.get("asset_role") or "scene_asset"), + job_path / "assets" / f"{asset_id}.jpg", + str(scene_asset.get("url") or ""), + frame_index=idx, + width=int(scene_asset.get("width") or 0), + height=int(scene_asset.get("height") or 0), + metadata={"label": scene_asset.get("label"), "scene_mode": scene_asset.get("scene_mode")}, + ) + for element in frame.get("elements") or []: + element_id = str(element.get("id") or "") + cutout_ids = list(element.get("cutouts") or []) + legacy_cutout = element.get("cutout_id") + if legacy_cutout and legacy_cutout not in cutout_ids: + cutout_ids.append(legacy_cutout) + for cutout_id in cutout_ids: + add( + f"{job_id}:cutout:{idx}:{element_id}:{cutout_id}", + "image", + "element_cutout", + job_path / "elements" / f"{idx:03d}_{element_id}_{cutout_id}.jpg", + f"/jobs/{job_id}/frames/{idx}/elements/{element_id}/cutouts/{cutout_id}.jpg", + frame_index=idx, + metadata={"element_id": element_id, "name_zh": element.get("name_zh")}, + ) + for subject_asset in element.get("subject_assets") or []: + asset_id = str(subject_asset.get("id") or "") + if asset_id: + add( + f"{job_id}:subject_asset:{asset_id}", + "image", + "subject_asset", + job_path / "assets" / f"{asset_id}.jpg", + str(subject_asset.get("url") or ""), + frame_index=idx, + width=int(subject_asset.get("width") or 0), + height=int(subject_asset.get("height") or 0), + metadata={"view": subject_asset.get("view"), "label": subject_asset.get("label")}, + ) + + for ref in job.get("product_refs") or []: + asset_id = str(ref.get("id") or ref.get("asset_id") or ref.get("url") or "") + if asset_id: + add( + f"{job_id}:product_ref:{asset_id}", + "image", + "product_ref", + self._path_from_job_url(job_path, job_id, str(ref.get("url") or "")), + str(ref.get("url") or ""), + metadata=ref, + ) + + for video in job.get("generated_videos") or []: + video_id = str(video.get("id") or "") + if video_id: + add( + f"{job_id}:generated_video:{video_id}", + "video", + "generated_video", + job_path / "videos" / f"{video_id}.mp4", + str(video.get("url") or ""), + frame_index=video.get("frame_idx"), + duration=float(video.get("duration") or 0), + metadata={"status": video.get("status"), "model": video.get("model"), "error": video.get("error")}, + ) + return items + + def _path_from_job_url(self, job_path: Path, job_id: str, url: str) -> str: + prefix = f"/jobs/{job_id}/" + if not url.startswith(prefix): + return "" + tail = url[len(prefix):] + if tail == "video.mp4": + return str(job_path / "source.mp4") + return str(job_path / tail) + + def delete_job(self, job_id: str) -> None: + if not self.enabled: + return + with self.connect() as conn: + row = conn.execute("SELECT document_id FROM jobs WHERE id = ?", (job_id,)).fetchone() + conn.execute("DELETE FROM jobs WHERE id = ?", (job_id,)) + if row: + remaining = conn.execute( + "SELECT COUNT(*) AS c FROM jobs WHERE document_id = ?", + (row["document_id"],), + ).fetchone() + if int(remaining["c"] or 0) == 0: + conn.execute("DELETE FROM documents WHERE id = ?", (row["document_id"],)) + + def list_documents(self, limit: int | None = None) -> list[dict[str, Any]]: + sql = """ + SELECT + d.*, + COUNT(DISTINCT j.id) AS job_count, + COUNT(DISTINCT a.id) AS asset_count + FROM documents d + LEFT JOIN jobs j ON j.document_id = d.id + LEFT JOIN media_assets a ON a.document_id = d.id + GROUP BY d.id + ORDER BY d.updated_at DESC + """ + params: tuple[Any, ...] = () + if limit is not None and limit > 0: + sql += " LIMIT ?" + params = (limit,) + with self.connect() as conn: + rows = conn.execute(sql, params).fetchall() + return [dict(row) for row in rows] + + def health(self) -> dict[str, Any]: + if not self.enabled: + return {"enabled": False, "url": redact_database_url(self.url), "error": self.error} + try: + with self.connect() as conn: + docs = conn.execute("SELECT COUNT(*) AS c FROM documents").fetchone()["c"] + jobs = conn.execute("SELECT COUNT(*) AS c FROM jobs").fetchone()["c"] + assets = conn.execute("SELECT COUNT(*) AS c FROM media_assets").fetchone()["c"] + return { + "enabled": True, + "url": redact_database_url(self.url), + "schema_version": SCHEMA_VERSION, + "documents": int(docs or 0), + "jobs": int(jobs or 0), + "assets": int(assets or 0), + } + except Exception as e: + return {"enabled": False, "url": redact_database_url(self.url), "error": str(e)} + + +def create_database(url: str, jobs_dir: Path) -> AppDatabase: + db = AppDatabase(url, jobs_dir) + db.init() + return db diff --git a/api/main.py b/api/main.py index f44e71e..ff454f3 100644 --- a/api/main.py +++ b/api/main.py @@ -25,10 +25,19 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from pydantic import BaseModel, Field +from database import create_database, default_database_url, default_workflow_mode, infer_source_kind, storage_prefix + load_dotenv() JOBS_DIR = Path(os.getenv("JOBS_DIR", "./jobs")).resolve() JOBS_DIR.mkdir(parents=True, exist_ok=True) +DATABASE_URL = default_database_url(JOBS_DIR) +DB_INIT_ERROR = "" +try: + DB = create_database(DATABASE_URL, JOBS_DIR) +except Exception as e: + DB = None + DB_INIT_ERROR = str(e) CORS_ORIGINS = [o.strip() for o in os.getenv("CORS_ORIGINS", "http://localhost:4290,http://127.0.0.1:4290").split(",") if o.strip()] PRODUCT_LIBRARY_DIR = Path( os.getenv("PRODUCT_LIBRARY_DIR", Path(__file__).resolve().parent / "product_library" / "skg-products") @@ -542,6 +551,10 @@ class AudioScript(BaseModel): class Job(BaseModel): id: str url: str + document_id: str = "" + source_kind: Literal["tiktok_link", "upload", "unknown"] = "unknown" + workflow_mode: Literal["feed_recreation", "uploaded_reference"] = "feed_recreation" + storage_prefix: str = "" status: JobStatus = "created" progress: int = 0 message: str = "" @@ -641,8 +654,26 @@ def job_with_artifacts(job: Job) -> Job: return job.model_copy(update=updates) +def ensure_job_document_fields(job: Job) -> Job: + source_kind = job.source_kind if job.source_kind != "unknown" else infer_source_kind(job.url) + workflow_mode = job.workflow_mode or default_workflow_mode(source_kind) + document_id = job.document_id or job.id + job.source_kind = source_kind if source_kind in {"tiktok_link", "upload"} else "unknown" + job.workflow_mode = workflow_mode if workflow_mode in {"feed_recreation", "uploaded_reference"} else "feed_recreation" + job.document_id = document_id + job.storage_prefix = job.storage_prefix or storage_prefix(document_id, job.source_kind, job.workflow_mode) + return job + + def save_state(job: Job) -> None: - (job_dir(job.id) / "state.json").write_text(job.model_dump_json(indent=2)) + ensure_job_document_fields(job) + d = job_dir(job.id) + (d / "state.json").write_text(job.model_dump_json(indent=2)) + if DB: + try: + DB.sync_job(job.model_dump(mode="json"), d) + except Exception as e: + print(f"[database sync failed] job={job.id} error={e}", flush=True) def update(job: Job, **kw) -> None: @@ -3024,6 +3055,7 @@ def health() -> dict: "base_url": LLM_BASE_URL or "openai-default", "image_base_url": IMAGE_BASE_URL or LLM_BASE_URL or "openai-default", "voice_base_url": AZURE_OPENAI_BASE_URL, + "database": DB.health() if DB else {"enabled": False, "url": DATABASE_URL, "error": DB_INIT_ERROR}, "models": { "asr": ASR_MODEL, "local_asr": LOCAL_ASR_MODEL, @@ -3059,6 +3091,9 @@ def health() -> dict: class JobSummary(BaseModel): id: str + document_id: str = "" + source_kind: str = "unknown" + workflow_mode: str = "feed_recreation" url: str status: JobStatus progress: int = 0 @@ -3074,6 +3109,29 @@ class JobSummary(BaseModel): mtime: float = 0.0 +class DocumentSummary(BaseModel): + id: str + title: str + source_kind: str + workflow_mode: str + source_url: str = "" + primary_job_id: str = "" + status: str = "created" + storage_prefix: str = "" + job_count: int = 0 + asset_count: int = 0 + created_at: float = 0.0 + updated_at: float = 0.0 + + +@app.get("/documents", response_model=list[DocumentSummary]) +def list_documents(limit: int | None = None) -> list[DocumentSummary]: + if not DB: + return [] + rows = DB.list_documents(limit) + return [DocumentSummary(**row) for row in rows] + + @app.get("/jobs", response_model=list[JobSummary]) def list_jobs(limit: int | None = None) -> list[JobSummary]: """所有 job 的精简列表,按磁盘 state.json mtime 倒序(最新优先)。前端无 ?job= 时用它回填历史。""" @@ -3082,8 +3140,12 @@ def list_jobs(limit: int | None = None) -> list[JobSummary]: state_path = JOBS_DIR / job_id / "state.json" mtime = state_path.stat().st_mtime if state_path.exists() else 0.0 thumb = f"/jobs/{job_id}/frames/{job.frames[0].index}.jpg" if job.frames else "" + ensure_job_document_fields(job) items.append(JobSummary( id=job.id, + document_id=job.document_id, + source_kind=job.source_kind, + workflow_mode=job.workflow_mode, url=job.url, status=job.status, progress=job.progress, @@ -3109,7 +3171,7 @@ async def create_job(req: CreateJobReq, bg: BackgroundTasks) -> Job: if not req.url.strip(): raise HTTPException(400, "url required") job_id = uuid.uuid4().hex[:12] - job = Job(id=job_id, url=req.url.strip()) + job = Job(id=job_id, url=req.url.strip(), document_id=job_id, source_kind="tiktok_link", workflow_mode="feed_recreation") JOBS[job_id] = job save_state(job) bg.add_task(pipeline_download, job_id) @@ -3133,7 +3195,7 @@ async def create_job_from_upload(bg: BackgroundTasks, file: UploadFile = File(.. if not mp4.exists() or mp4.stat().st_size == 0: raise HTTPException(500, "upload failed") - job = Job(id=job_id, url=f"upload://{file.filename}") + job = Job(id=job_id, url=f"upload://{file.filename}", document_id=job_id, source_kind="upload", workflow_mode="uploaded_reference") JOBS[job_id] = job save_state(job) bg.add_task(pipeline_download, job_id) @@ -3223,6 +3285,11 @@ def delete_job(job_id: str) -> dict[str, bool | str]: job = JOBS.pop(job_id, None) if not job and not d.exists(): raise HTTPException(404, "job not found") + if DB: + try: + DB.delete_job(job_id) + except Exception as e: + print(f"[database delete failed] job={job_id} error={e}", flush=True) if d.exists(): shutil.rmtree(d) return {"ok": True, "id": job_id} diff --git a/deploy/.env.production.example b/deploy/.env.production.example index 4f3f47f..3cd1f02 100644 --- a/deploy/.env.production.example +++ b/deploy/.env.production.example @@ -3,6 +3,7 @@ # Runtime JOBS_DIR=/data/jobs +APP_DB_URL=sqlite:////data/jobs/app.db KEYFRAME_COUNT=6 CORS_ORIGINS=https://marketing.skg.com API_PORT=4291