auto-save 2026-05-18 15:29 (+1, ~5)

This commit is contained in:
2026-05-18 15:29:47 +08:00
parent 408c5fca47
commit 1c451c6ab3
6 changed files with 616 additions and 6 deletions

View File

@@ -43,6 +43,7 @@
## 关键文件
- `api/main.py` — FastAPI 后端、模型路由、任务状态、ASR/翻译/音频分析、生图、产品识别、首尾帧和视频接口。
- `api/database.py` — 后端数据库层;当前用 SQLite 保存 document / job / media asset 元数据,媒体文件仍在 `jobs/<jobId>/`
- `api/.env.example` — 本地模型和网关模板;已包含 `GPT_TEXT_MODEL=gpt-4o`
- `deploy/.env.production.example` — 生产环境模板;视频默认 SKG Doubao / Seedance 网关。
- `RULES.md` — 启动、部署事实、模型环境变量和项目规则。
@@ -54,6 +55,7 @@
## 主要 API
```
GET /health
GET /documents
POST /jobs
POST /jobs/upload
GET /jobs
@@ -89,10 +91,12 @@ POST /jobs/{id}/frames/{idx}/storyboard/video
6. 当前主流程不直接批量提交视频;先走“分镜规划 → 首尾帧 → 人工审核”。
7. 产品素材池默认是“同一产品”,不做不同产品身份判断;视角识别必须按佩戴者左 / 右、上 / 下、内 / 外侧描述。
8. 自动抽帧默认是 `frames=6` + `target=random_subject` + `quality=accurate` + `mode=replace`;如果需要特定动作或表情,用“当前点抽帧”手动补。
8. 后端长任务不要用 `--reload`
9. 关键帧 `index` 是稳定 ID不等于数组下标前端取帧用 `frames.find(x => x.index === idx)`
9. 文档是顶层业务归类:每个 TK 链接或上传视频默认一个 `document``job` 归属到 `document_id`DB 存元数据和文件索引,视频 / 图片 / 音频文件不进 DB
10. 后端长任务不要用 `--reload`
11. 关键帧 `index` 是稳定 ID不等于数组下标前端取帧用 `frames.find(x => x.index === idx)`
## 最近变更
- 2026-05-18新增后端数据库层SQLite 默认落在 `APP_DB_URL` / `DATABASE_URL``JOBS_DIR/app.db``/documents` 返回文档归类列表,`/health.database` 返回 DB 状态。
- 2026-05-18`VISION_MODEL``REWRITE_MODEL``AUDIO_REWRITE_MODEL` 切到 GPT 默认模型 `gpt-4o`,并加旧 Gemini 环境变量归一化保护。
- 2026-05-18语音通道固定 Azure OpenAI TTS移除 MiniMax fallback并按 `AZURE_TTS_PATHS` 尝试语音路径。
- 2026-05-18当前主路径暂停直接提交视频改为逐条首尾帧闸门。

View File

@@ -24,7 +24,7 @@
- 服务器目录:`/opt/skg-marketing-studio`
- 生产启动:`docker compose -f docker-compose.prod.yml --env-file deploy/.env.production up -d --build`
- 生产架构:`web` 容器用 Nginx 承载 Next 静态导出;`/login/``/_next/``/assets/``/skg-logo-black.svg``/oasis-source/` 等登录页必需静态资源公开访问;未登录访问工作台跳转 `/login/``/api/` 通过 Nginx `auth_request` 校验 FastAPI 会话 Cookie 后反代到 `skg-marketing-api:4291`Traefik 通过 `coolify` 外部网络接入 80/443
- 持久化目录:服务器 `./data/jobs` 挂载到后端 `/data/jobs`
- 持久化目录:服务器 `./data/jobs` 挂载到后端 `/data/jobs`;默认后端数据库为 `APP_DB_URL=sqlite:////data/jobs/app.db`,只存文档 / job / 媒体资产元数据和文件索引,原视频、音频、抽帧、生图、视频候选仍放在 `/data/jobs/<jobId>/`
- 登录凭证:用户名写下方快捷登录;密码明文备份只放服务器 `/root/skg-marketing-studio-login.txt`,生产环境变量 `WEB_AUTH_PASSWORD` / `WEB_AUTH_SESSION_SECRET` 只放服务器 `deploy/.env.production`
## 快捷登录
@@ -70,6 +70,7 @@
- `AZURE_TTS_MODEL` / `AZURE_TTS_VOICE_ID` / `AZURE_TTS_VOICE_POOL` / `AZURE_TTS_PATH` / `AZURE_TTS_PATHS`Azure OpenAI TTS 模型、默认音色、音色池和 OpenAI 协议语音路径;后端会按 `AZURE_TTS_PATHS` 依次尝试,便于区分路径不对和整条语音服务不可用
- MiniMax TTS 不再作为语音 fallback不要新增或依赖 `MINIMAX_*` 配置
- `POE_API_KEY` / `VIDEO_API_KEY`:视频生成通道 Key只能放本地环境变量
- `APP_DB_URL` / `DATABASE_URL`:后端元数据数据库;当前内置实现支持 `sqlite:///`,生产默认 `sqlite:////data/jobs/app.db`。文档归类以 `documents` 为顶层,一条 TK 链接或一次上传默认一个 document`jobs``media_assets` 归属到 `document_id`
- `WEB_AUTH_USERNAME` / `WEB_AUTH_PASSWORD` / `WEB_AUTH_SESSION_SECRET`:生产网页登录和会话签名配置;密码和 session secret 只放服务器环境变量,不入库
- `FFMPEG_BIN` / `FFPROBE_BIN`:可选本地媒体二进制路径;本机 Homebrew ffmpeg 动态库损坏时,后端会自动跳过不可用的 PATH 版本并尝试本机静态 ffmpeg 备选,生产仍建议使用系统 ffmpeg/ffprobe
- 生产环境变量:服务器只使用 `deploy/.env.production`,模板为 `deploy/.env.production.example`;真实 Key 不入库

View File

@@ -77,6 +77,7 @@ VIDEO_DURATION_FIELD=seconds
VIDEO_POLL_TIMEOUT_SECONDS=900
# 工作目录
APP_DB_URL=sqlite:///./jobs/app.db
KEYFRAME_COUNT=6
JOBS_DIR=./jobs

536
api/database.py Normal file
View File

@@ -0,0 +1,536 @@
from __future__ import annotations
import json
import os
import sqlite3
import time
from pathlib import Path
from typing import Any
SCHEMA_VERSION = 1
def default_database_url(jobs_dir: Path) -> str:
return os.getenv("APP_DB_URL") or os.getenv("DATABASE_URL") or f"sqlite:///{jobs_dir / 'app.db'}"
def redact_database_url(url: str) -> str:
if "://" not in url or "@" not in url:
return url
scheme, rest = url.split("://", 1)
_, host = rest.rsplit("@", 1)
return f"{scheme}://***@{host}"
def infer_source_kind(url: str) -> str:
if url.startswith("upload://"):
return "upload"
if url.startswith("http://") or url.startswith("https://"):
return "tiktok_link"
return "unknown"
def default_workflow_mode(source_kind: str) -> str:
if source_kind == "upload":
return "uploaded_reference"
return "feed_recreation"
def document_title(url: str, source_kind: str, fallback: str) -> str:
if source_kind == "upload":
return url.replace("upload://", "", 1).strip() or fallback
if url:
return url.strip()[:120]
return fallback
def storage_prefix(document_id: str, source_kind: str, workflow_mode: str) -> str:
source = source_kind or "unknown"
mode = workflow_mode or default_workflow_mode(source)
return f"{mode}/{source}/{document_id}"
class AppDatabase:
def __init__(self, url: str, jobs_dir: Path):
self.url = url
self.jobs_dir = jobs_dir
self.path = self._sqlite_path(url)
self.enabled = True
self.error = ""
@staticmethod
def _sqlite_path(url: str) -> Path:
if url == ":memory:":
return Path(":memory:")
if not url.startswith("sqlite:///"):
raise RuntimeError("当前内置数据库层只支持 sqlite:/// URLPostgres 迁移会复用同一张表语义。")
raw = url[len("sqlite:///"):]
return Path(raw).expanduser().resolve()
def connect(self) -> sqlite3.Connection:
if str(self.path) != ":memory:":
self.path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self.path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def init(self) -> None:
with self.connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS schema_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
source_kind TEXT NOT NULL,
workflow_mode TEXT NOT NULL,
source_url TEXT NOT NULL DEFAULT '',
primary_job_id TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'created',
storage_prefix TEXT NOT NULL,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
source_kind TEXT NOT NULL,
workflow_mode TEXT NOT NULL,
source_url TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL,
progress INTEGER NOT NULL DEFAULT 0,
message TEXT NOT NULL DEFAULT '',
storage_path TEXT NOT NULL,
state_path TEXT NOT NULL,
video_url TEXT NOT NULL DEFAULT '',
duration REAL NOT NULL DEFAULT 0,
width INTEGER NOT NULL DEFAULT 0,
height INTEGER NOT NULL DEFAULT 0,
frame_count INTEGER NOT NULL DEFAULT 0,
video_count INTEGER NOT NULL DEFAULT 0,
error TEXT NOT NULL DEFAULT '',
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS media_assets (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
job_id TEXT NOT NULL,
kind TEXT NOT NULL,
role TEXT NOT NULL,
path TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
frame_index INTEGER,
timestamp REAL,
width INTEGER NOT NULL DEFAULT 0,
height INTEGER NOT NULL DEFAULT 0,
duration REAL NOT NULL DEFAULT 0,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE,
FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_documents_updated_at ON documents(updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_documents_source_kind ON documents(source_kind);
CREATE INDEX IF NOT EXISTS idx_documents_workflow_mode ON documents(workflow_mode);
CREATE INDEX IF NOT EXISTS idx_jobs_document_id ON jobs(document_id);
CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_assets_document_id ON media_assets(document_id);
CREATE INDEX IF NOT EXISTS idx_assets_job_id ON media_assets(job_id);
CREATE INDEX IF NOT EXISTS idx_assets_role ON media_assets(role);
"""
)
conn.execute(
"INSERT OR REPLACE INTO schema_meta(key, value) VALUES('schema_version', ?)",
(str(SCHEMA_VERSION),),
)
def normalize_job_document(self, job: dict[str, Any]) -> dict[str, Any]:
job_id = str(job.get("id") or "")
source_url = str(job.get("url") or "")
source_kind = str(job.get("source_kind") or "") or infer_source_kind(source_url)
workflow_mode = str(job.get("workflow_mode") or "") or default_workflow_mode(source_kind)
document_id = str(job.get("document_id") or "") or job_id
prefix = str(job.get("storage_prefix") or "") or storage_prefix(document_id, source_kind, workflow_mode)
return {
"document_id": document_id,
"source_kind": source_kind,
"workflow_mode": workflow_mode,
"storage_prefix": prefix,
"title": document_title(source_url, source_kind, document_id),
}
def sync_job(self, job: dict[str, Any], job_path: Path) -> None:
if not self.enabled:
return
now = time.time()
job_id = str(job.get("id") or "")
if not job_id:
return
doc = self.normalize_job_document(job)
state_path = job_path / "state.json"
frames = list(job.get("frames") or [])
generated_videos = list(job.get("generated_videos") or [])
metadata = {
"audio_segment_count": len(job.get("transcript") or []),
"product_ref_count": len(job.get("product_refs") or []),
"storyboard_image_count": len(job.get("storyboard_images") or []),
}
with self.connect() as conn:
existing = conn.execute(
"SELECT created_at FROM documents WHERE id = ?",
(doc["document_id"],),
).fetchone()
created_at = float(existing["created_at"]) if existing else now
conn.execute(
"""
INSERT INTO documents(
id, title, source_kind, workflow_mode, source_url, primary_job_id,
status, storage_prefix, metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
title = excluded.title,
source_kind = excluded.source_kind,
workflow_mode = excluded.workflow_mode,
source_url = excluded.source_url,
primary_job_id = excluded.primary_job_id,
status = excluded.status,
storage_prefix = excluded.storage_prefix,
metadata_json = excluded.metadata_json,
updated_at = excluded.updated_at
""",
(
doc["document_id"],
doc["title"],
doc["source_kind"],
doc["workflow_mode"],
str(job.get("url") or ""),
job_id,
str(job.get("status") or "created"),
doc["storage_prefix"],
json.dumps(metadata, ensure_ascii=False),
created_at,
now,
),
)
existing_job = conn.execute("SELECT created_at FROM jobs WHERE id = ?", (job_id,)).fetchone()
job_created_at = float(existing_job["created_at"]) if existing_job else now
conn.execute(
"""
INSERT INTO jobs(
id, document_id, source_kind, workflow_mode, source_url, status,
progress, message, storage_path, state_path, video_url, duration,
width, height, frame_count, video_count, error, metadata_json,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
document_id = excluded.document_id,
source_kind = excluded.source_kind,
workflow_mode = excluded.workflow_mode,
source_url = excluded.source_url,
status = excluded.status,
progress = excluded.progress,
message = excluded.message,
storage_path = excluded.storage_path,
state_path = excluded.state_path,
video_url = excluded.video_url,
duration = excluded.duration,
width = excluded.width,
height = excluded.height,
frame_count = excluded.frame_count,
video_count = excluded.video_count,
error = excluded.error,
metadata_json = excluded.metadata_json,
updated_at = excluded.updated_at
""",
(
job_id,
doc["document_id"],
doc["source_kind"],
doc["workflow_mode"],
str(job.get("url") or ""),
str(job.get("status") or "created"),
int(job.get("progress") or 0),
str(job.get("message") or ""),
str(job_path),
str(state_path),
str(job.get("video_url") or ""),
float(job.get("duration") or 0),
int(job.get("width") or 0),
int(job.get("height") or 0),
len(frames),
len(generated_videos),
str(job.get("error") or ""),
json.dumps(metadata, ensure_ascii=False),
job_created_at,
now,
),
)
conn.execute("DELETE FROM media_assets WHERE job_id = ?", (job_id,))
for asset in self._job_assets(job, job_path, doc["document_id"]):
conn.execute(
"""
INSERT INTO media_assets(
id, document_id, job_id, kind, role, path, url, frame_index,
timestamp, width, height, duration, metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
asset["id"],
asset["document_id"],
asset["job_id"],
asset["kind"],
asset["role"],
asset.get("path", ""),
asset.get("url", ""),
asset.get("frame_index"),
asset.get("timestamp"),
int(asset.get("width") or 0),
int(asset.get("height") or 0),
float(asset.get("duration") or 0),
json.dumps(asset.get("metadata") or {}, ensure_ascii=False),
now,
now,
),
)
def _job_assets(self, job: dict[str, Any], job_path: Path, document_id: str) -> list[dict[str, Any]]:
job_id = str(job.get("id") or "")
items: list[dict[str, Any]] = []
def add(
asset_id: str,
kind: str,
role: str,
path: Path | str = "",
url: str = "",
frame_index: int | None = None,
timestamp: float | None = None,
width: int = 0,
height: int = 0,
duration: float = 0.0,
metadata: dict[str, Any] | None = None,
) -> None:
items.append({
"id": asset_id,
"document_id": document_id,
"job_id": job_id,
"kind": kind,
"role": role,
"path": str(path) if path else "",
"url": url,
"frame_index": frame_index,
"timestamp": timestamp,
"width": width,
"height": height,
"duration": duration,
"metadata": metadata or {},
})
if (job_path / "source.mp4").exists() or job.get("video_url"):
add(
f"{job_id}:source_video",
"video",
"source_video",
job_path / "source.mp4",
str(job.get("video_url") or f"/jobs/{job_id}/video.mp4"),
duration=float(job.get("duration") or 0),
width=int(job.get("width") or 0),
height=int(job.get("height") or 0),
)
if (job_path / "audio.wav").exists() or job.get("source_audio_url"):
add(
f"{job_id}:source_audio",
"audio",
"source_audio",
job_path / "audio.wav",
str(job.get("source_audio_url") or f"/jobs/{job_id}/audio.wav"),
duration=float(job.get("duration") or 0),
)
for frame in job.get("frames") or []:
idx = int(frame.get("index") or 0)
add(
f"{job_id}:frame:{idx}",
"image",
"keyframe",
job_path / "frames" / f"{idx:03d}.jpg",
str(frame.get("url") or f"/jobs/{job_id}/frames/{idx}.jpg"),
frame_index=idx,
timestamp=float(frame.get("timestamp") or 0),
metadata={"quality_report": frame.get("quality_report")},
)
if frame.get("cleaned_url"):
add(
f"{job_id}:frame:{idx}:cleaned",
"image",
"cleaned_keyframe",
job_path / "cleaned" / f"{idx:03d}.jpg",
str(frame.get("cleaned_url")),
frame_index=idx,
timestamp=float(frame.get("timestamp") or 0),
)
for generated in frame.get("generated_images") or []:
gen_id = str(generated.get("id") or "")
if gen_id:
add(
f"{job_id}:generated_image:{idx}:{gen_id}",
"image",
"generated_image",
job_path / "gen" / f"{idx:03d}_{gen_id}.jpg",
str(generated.get("url") or ""),
frame_index=idx,
metadata={"model": generated.get("model"), "mode": generated.get("mode")},
)
for scene_asset in frame.get("scene_assets") or []:
asset_id = str(scene_asset.get("id") or "")
if asset_id:
add(
f"{job_id}:scene_asset:{asset_id}",
"image",
str(scene_asset.get("asset_role") or "scene_asset"),
job_path / "assets" / f"{asset_id}.jpg",
str(scene_asset.get("url") or ""),
frame_index=idx,
width=int(scene_asset.get("width") or 0),
height=int(scene_asset.get("height") or 0),
metadata={"label": scene_asset.get("label"), "scene_mode": scene_asset.get("scene_mode")},
)
for element in frame.get("elements") or []:
element_id = str(element.get("id") or "")
cutout_ids = list(element.get("cutouts") or [])
legacy_cutout = element.get("cutout_id")
if legacy_cutout and legacy_cutout not in cutout_ids:
cutout_ids.append(legacy_cutout)
for cutout_id in cutout_ids:
add(
f"{job_id}:cutout:{idx}:{element_id}:{cutout_id}",
"image",
"element_cutout",
job_path / "elements" / f"{idx:03d}_{element_id}_{cutout_id}.jpg",
f"/jobs/{job_id}/frames/{idx}/elements/{element_id}/cutouts/{cutout_id}.jpg",
frame_index=idx,
metadata={"element_id": element_id, "name_zh": element.get("name_zh")},
)
for subject_asset in element.get("subject_assets") or []:
asset_id = str(subject_asset.get("id") or "")
if asset_id:
add(
f"{job_id}:subject_asset:{asset_id}",
"image",
"subject_asset",
job_path / "assets" / f"{asset_id}.jpg",
str(subject_asset.get("url") or ""),
frame_index=idx,
width=int(subject_asset.get("width") or 0),
height=int(subject_asset.get("height") or 0),
metadata={"view": subject_asset.get("view"), "label": subject_asset.get("label")},
)
for ref in job.get("product_refs") or []:
asset_id = str(ref.get("id") or ref.get("asset_id") or ref.get("url") or "")
if asset_id:
add(
f"{job_id}:product_ref:{asset_id}",
"image",
"product_ref",
self._path_from_job_url(job_path, job_id, str(ref.get("url") or "")),
str(ref.get("url") or ""),
metadata=ref,
)
for video in job.get("generated_videos") or []:
video_id = str(video.get("id") or "")
if video_id:
add(
f"{job_id}:generated_video:{video_id}",
"video",
"generated_video",
job_path / "videos" / f"{video_id}.mp4",
str(video.get("url") or ""),
frame_index=video.get("frame_idx"),
duration=float(video.get("duration") or 0),
metadata={"status": video.get("status"), "model": video.get("model"), "error": video.get("error")},
)
return items
def _path_from_job_url(self, job_path: Path, job_id: str, url: str) -> str:
prefix = f"/jobs/{job_id}/"
if not url.startswith(prefix):
return ""
tail = url[len(prefix):]
if tail == "video.mp4":
return str(job_path / "source.mp4")
return str(job_path / tail)
def delete_job(self, job_id: str) -> None:
if not self.enabled:
return
with self.connect() as conn:
row = conn.execute("SELECT document_id FROM jobs WHERE id = ?", (job_id,)).fetchone()
conn.execute("DELETE FROM jobs WHERE id = ?", (job_id,))
if row:
remaining = conn.execute(
"SELECT COUNT(*) AS c FROM jobs WHERE document_id = ?",
(row["document_id"],),
).fetchone()
if int(remaining["c"] or 0) == 0:
conn.execute("DELETE FROM documents WHERE id = ?", (row["document_id"],))
def list_documents(self, limit: int | None = None) -> list[dict[str, Any]]:
sql = """
SELECT
d.*,
COUNT(DISTINCT j.id) AS job_count,
COUNT(DISTINCT a.id) AS asset_count
FROM documents d
LEFT JOIN jobs j ON j.document_id = d.id
LEFT JOIN media_assets a ON a.document_id = d.id
GROUP BY d.id
ORDER BY d.updated_at DESC
"""
params: tuple[Any, ...] = ()
if limit is not None and limit > 0:
sql += " LIMIT ?"
params = (limit,)
with self.connect() as conn:
rows = conn.execute(sql, params).fetchall()
return [dict(row) for row in rows]
def health(self) -> dict[str, Any]:
if not self.enabled:
return {"enabled": False, "url": redact_database_url(self.url), "error": self.error}
try:
with self.connect() as conn:
docs = conn.execute("SELECT COUNT(*) AS c FROM documents").fetchone()["c"]
jobs = conn.execute("SELECT COUNT(*) AS c FROM jobs").fetchone()["c"]
assets = conn.execute("SELECT COUNT(*) AS c FROM media_assets").fetchone()["c"]
return {
"enabled": True,
"url": redact_database_url(self.url),
"schema_version": SCHEMA_VERSION,
"documents": int(docs or 0),
"jobs": int(jobs or 0),
"assets": int(assets or 0),
}
except Exception as e:
return {"enabled": False, "url": redact_database_url(self.url), "error": str(e)}
def create_database(url: str, jobs_dir: Path) -> AppDatabase:
db = AppDatabase(url, jobs_dir)
db.init()
return db

View File

@@ -25,10 +25,19 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from database import create_database, default_database_url, default_workflow_mode, infer_source_kind, storage_prefix
load_dotenv()
JOBS_DIR = Path(os.getenv("JOBS_DIR", "./jobs")).resolve()
JOBS_DIR.mkdir(parents=True, exist_ok=True)
DATABASE_URL = default_database_url(JOBS_DIR)
DB_INIT_ERROR = ""
try:
DB = create_database(DATABASE_URL, JOBS_DIR)
except Exception as e:
DB = None
DB_INIT_ERROR = str(e)
CORS_ORIGINS = [o.strip() for o in os.getenv("CORS_ORIGINS", "http://localhost:4290,http://127.0.0.1:4290").split(",") if o.strip()]
PRODUCT_LIBRARY_DIR = Path(
os.getenv("PRODUCT_LIBRARY_DIR", Path(__file__).resolve().parent / "product_library" / "skg-products")
@@ -542,6 +551,10 @@ class AudioScript(BaseModel):
class Job(BaseModel):
id: str
url: str
document_id: str = ""
source_kind: Literal["tiktok_link", "upload", "unknown"] = "unknown"
workflow_mode: Literal["feed_recreation", "uploaded_reference"] = "feed_recreation"
storage_prefix: str = ""
status: JobStatus = "created"
progress: int = 0
message: str = ""
@@ -641,8 +654,26 @@ def job_with_artifacts(job: Job) -> Job:
return job.model_copy(update=updates)
def ensure_job_document_fields(job: Job) -> Job:
source_kind = job.source_kind if job.source_kind != "unknown" else infer_source_kind(job.url)
workflow_mode = job.workflow_mode or default_workflow_mode(source_kind)
document_id = job.document_id or job.id
job.source_kind = source_kind if source_kind in {"tiktok_link", "upload"} else "unknown"
job.workflow_mode = workflow_mode if workflow_mode in {"feed_recreation", "uploaded_reference"} else "feed_recreation"
job.document_id = document_id
job.storage_prefix = job.storage_prefix or storage_prefix(document_id, job.source_kind, job.workflow_mode)
return job
def save_state(job: Job) -> None:
(job_dir(job.id) / "state.json").write_text(job.model_dump_json(indent=2))
ensure_job_document_fields(job)
d = job_dir(job.id)
(d / "state.json").write_text(job.model_dump_json(indent=2))
if DB:
try:
DB.sync_job(job.model_dump(mode="json"), d)
except Exception as e:
print(f"[database sync failed] job={job.id} error={e}", flush=True)
def update(job: Job, **kw) -> None:
@@ -3024,6 +3055,7 @@ def health() -> dict:
"base_url": LLM_BASE_URL or "openai-default",
"image_base_url": IMAGE_BASE_URL or LLM_BASE_URL or "openai-default",
"voice_base_url": AZURE_OPENAI_BASE_URL,
"database": DB.health() if DB else {"enabled": False, "url": DATABASE_URL, "error": DB_INIT_ERROR},
"models": {
"asr": ASR_MODEL,
"local_asr": LOCAL_ASR_MODEL,
@@ -3059,6 +3091,9 @@ def health() -> dict:
class JobSummary(BaseModel):
id: str
document_id: str = ""
source_kind: str = "unknown"
workflow_mode: str = "feed_recreation"
url: str
status: JobStatus
progress: int = 0
@@ -3074,6 +3109,29 @@ class JobSummary(BaseModel):
mtime: float = 0.0
class DocumentSummary(BaseModel):
id: str
title: str
source_kind: str
workflow_mode: str
source_url: str = ""
primary_job_id: str = ""
status: str = "created"
storage_prefix: str = ""
job_count: int = 0
asset_count: int = 0
created_at: float = 0.0
updated_at: float = 0.0
@app.get("/documents", response_model=list[DocumentSummary])
def list_documents(limit: int | None = None) -> list[DocumentSummary]:
if not DB:
return []
rows = DB.list_documents(limit)
return [DocumentSummary(**row) for row in rows]
@app.get("/jobs", response_model=list[JobSummary])
def list_jobs(limit: int | None = None) -> list[JobSummary]:
"""所有 job 的精简列表,按磁盘 state.json mtime 倒序(最新优先)。前端无 ?job= 时用它回填历史。"""
@@ -3082,8 +3140,12 @@ def list_jobs(limit: int | None = None) -> list[JobSummary]:
state_path = JOBS_DIR / job_id / "state.json"
mtime = state_path.stat().st_mtime if state_path.exists() else 0.0
thumb = f"/jobs/{job_id}/frames/{job.frames[0].index}.jpg" if job.frames else ""
ensure_job_document_fields(job)
items.append(JobSummary(
id=job.id,
document_id=job.document_id,
source_kind=job.source_kind,
workflow_mode=job.workflow_mode,
url=job.url,
status=job.status,
progress=job.progress,
@@ -3109,7 +3171,7 @@ async def create_job(req: CreateJobReq, bg: BackgroundTasks) -> Job:
if not req.url.strip():
raise HTTPException(400, "url required")
job_id = uuid.uuid4().hex[:12]
job = Job(id=job_id, url=req.url.strip())
job = Job(id=job_id, url=req.url.strip(), document_id=job_id, source_kind="tiktok_link", workflow_mode="feed_recreation")
JOBS[job_id] = job
save_state(job)
bg.add_task(pipeline_download, job_id)
@@ -3133,7 +3195,7 @@ async def create_job_from_upload(bg: BackgroundTasks, file: UploadFile = File(..
if not mp4.exists() or mp4.stat().st_size == 0:
raise HTTPException(500, "upload failed")
job = Job(id=job_id, url=f"upload://{file.filename}")
job = Job(id=job_id, url=f"upload://{file.filename}", document_id=job_id, source_kind="upload", workflow_mode="uploaded_reference")
JOBS[job_id] = job
save_state(job)
bg.add_task(pipeline_download, job_id)
@@ -3223,6 +3285,11 @@ def delete_job(job_id: str) -> dict[str, bool | str]:
job = JOBS.pop(job_id, None)
if not job and not d.exists():
raise HTTPException(404, "job not found")
if DB:
try:
DB.delete_job(job_id)
except Exception as e:
print(f"[database delete failed] job={job_id} error={e}", flush=True)
if d.exists():
shutil.rmtree(d)
return {"ok": True, "id": job_id}

View File

@@ -3,6 +3,7 @@
# Runtime
JOBS_DIR=/data/jobs
APP_DB_URL=sqlite:////data/jobs/app.db
KEYFRAME_COUNT=6
CORS_ORIGINS=https://marketing.skg.com
API_PORT=4291