feat: queue video generation per user

This commit is contained in:
2026-05-25 15:55:43 +08:00
parent f49d4b248c
commit 779e9b342b
7 changed files with 243 additions and 30 deletions

View File

@@ -9,6 +9,8 @@ WEB_AUTH_SESSION_SECRET=
WEB_AUTH_COOKIE_NAME=skg_marketing_session
WEB_AUTH_COOKIE_SECURE=false
AUTH_DATA_ISOLATION_ENABLED=true
VIDEO_QUEUE_MAX_CONCURRENT=2
VIDEO_QUEUE_MAX_CONCURRENT_PER_USER=1
# 飞书免登录OAuth。生产回调地址需同步配置到飞书开放平台应用安全设置。
FEISHU_APP_ID=

View File

@@ -16,6 +16,7 @@ import threading
import time
import uuid
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
from urllib.parse import urlencode
@@ -269,6 +270,8 @@ FEISHU_ALLOWED_EMAIL_DOMAINS = os.getenv("FEISHU_ALLOWED_EMAIL_DOMAINS", "").str
FEISHU_ALLOWED_EMAILS = os.getenv("FEISHU_ALLOWED_EMAILS", "").strip()
FEISHU_ALLOWED_TENANT_KEYS = os.getenv("FEISHU_ALLOWED_TENANT_KEYS", "").strip()
AUTH_DATA_ISOLATION_ENABLED = os.getenv("AUTH_DATA_ISOLATION_ENABLED", "true").strip().lower() not in {"0", "false", "no", "off"}
VIDEO_QUEUE_MAX_CONCURRENT = max(1, int(os.getenv("VIDEO_QUEUE_MAX_CONCURRENT", "2").strip() or "2"))
VIDEO_QUEUE_MAX_CONCURRENT_PER_USER = max(1, int(os.getenv("VIDEO_QUEUE_MAX_CONCURRENT_PER_USER", "1").strip() or "1"))
PASSWORD_AUTH_CONFIGURED = bool(WEB_AUTH_USERNAME and WEB_AUTH_PASSWORD and WEB_AUTH_SESSION_SECRET)
FEISHU_AUTH_CONFIGURED = bool(FEISHU_APP_ID and FEISHU_APP_SECRET and WEB_AUTH_SESSION_SECRET)
WEB_AUTH_CONFIGURED = bool(PASSWORD_AUTH_CONFIGURED or FEISHU_AUTH_CONFIGURED)
@@ -466,6 +469,9 @@ class GeneratedVideo(BaseModel):
progress: int = 0
error: str = ""
created_at: float = 0.0
queue_position: int = 0
queue_size: int = 0
queue_message: str = ""
class VideoSourceRef(BaseModel):
@@ -907,6 +913,20 @@ AUDIO_WORKERS_RUNNING: set[str] = set()
AUDIO_WORKERS_LOCK = threading.Lock()
@dataclass
class VideoQueueTask:
job_id: str
video_id: str
owner_id: str
args: tuple
created_at: float
VIDEO_QUEUE: list[VideoQueueTask] = []
VIDEO_QUEUE_RUNNING: dict[str, VideoQueueTask] = {}
VIDEO_QUEUE_LOCK = threading.Lock()
def ensure_auth_configured() -> None:
if not WEB_AUTH_CONFIGURED:
raise HTTPException(503, "WEB_AUTH_SESSION_SECRET 以及账号密码或飞书 OAuth 未配置")
@@ -1777,6 +1797,124 @@ def update_generated_video(job_id: str, video_id: str, **kw) -> None:
update(job, generated_videos=updated)
def generated_video_exists(job_id: str, video_id: str) -> bool:
job = JOBS.get(job_id)
return bool(job and any(v.id == video_id for v in job.generated_videos))
def _video_queue_owner(job: Job) -> str:
return (job.owner_id or f"job:{job.id}").strip()
def _video_queue_owner_running_locked(owner_id: str) -> bool:
return any(task.owner_id == owner_id for task in VIDEO_QUEUE_RUNNING.values())
def _refresh_video_queue_positions_locked() -> None:
queue_size = len(VIDEO_QUEUE)
for position, task in enumerate(VIDEO_QUEUE, start=1):
if not generated_video_exists(task.job_id, task.video_id):
continue
if _video_queue_owner_running_locked(task.owner_id):
message = "排队中 · 你的上一个视频生成中"
elif position == 1:
message = "排队中 · 即将开始"
else:
message = f"排队中 · 前方 {position - 1} 个任务"
update_generated_video(
task.job_id,
task.video_id,
status="queued",
progress=0,
queue_position=position,
queue_size=queue_size,
queue_message=message,
)
def _video_queue_running_by_owner_locked() -> dict[str, int]:
counts: dict[str, int] = {}
for task in VIDEO_QUEUE_RUNNING.values():
counts[task.owner_id] = counts.get(task.owner_id, 0) + 1
return counts
def dispatch_video_queue() -> None:
tasks_to_start: list[VideoQueueTask] = []
with VIDEO_QUEUE_LOCK:
running_by_owner = _video_queue_running_by_owner_locked()
while len(VIDEO_QUEUE_RUNNING) < VIDEO_QUEUE_MAX_CONCURRENT:
selected_index = -1
for index, task in enumerate(VIDEO_QUEUE):
if not generated_video_exists(task.job_id, task.video_id):
selected_index = index
break
if running_by_owner.get(task.owner_id, 0) < VIDEO_QUEUE_MAX_CONCURRENT_PER_USER:
selected_index = index
break
if selected_index < 0:
break
task = VIDEO_QUEUE.pop(selected_index)
if not generated_video_exists(task.job_id, task.video_id):
continue
VIDEO_QUEUE_RUNNING[task.video_id] = task
running_by_owner[task.owner_id] = running_by_owner.get(task.owner_id, 0) + 1
update_generated_video(
task.job_id,
task.video_id,
status="in_progress",
progress=1,
queue_position=0,
queue_size=len(VIDEO_QUEUE),
queue_message="准备素材…",
)
tasks_to_start.append(task)
_refresh_video_queue_positions_locked()
for task in tasks_to_start:
threading.Thread(target=_run_video_queue_task, args=(task,), daemon=True).start()
def _run_video_queue_task(task: VideoQueueTask) -> None:
try:
if generated_video_exists(task.job_id, task.video_id):
render_storyboard_video(*task.args)
finally:
with VIDEO_QUEUE_LOCK:
VIDEO_QUEUE_RUNNING.pop(task.video_id, None)
_refresh_video_queue_positions_locked()
dispatch_video_queue()
def enqueue_video_task(job: Job, video_id: str, task_args: tuple) -> None:
task = VideoQueueTask(
job_id=job.id,
video_id=video_id,
owner_id=_video_queue_owner(job),
args=task_args,
created_at=time.time(),
)
with VIDEO_QUEUE_LOCK:
VIDEO_QUEUE.append(task)
_refresh_video_queue_positions_locked()
dispatch_video_queue()
def cancel_queued_video_task(job_id: str, video_id: str) -> bool:
removed = False
with VIDEO_QUEUE_LOCK:
before = len(VIDEO_QUEUE)
VIDEO_QUEUE[:] = [task for task in VIDEO_QUEUE if not (task.job_id == job_id and task.video_id == video_id)]
removed = len(VIDEO_QUEUE) != before
if removed:
_refresh_video_queue_positions_locked()
if removed:
dispatch_video_queue()
return removed
@asynccontextmanager
async def lifespan(_: FastAPI):
try:
@@ -1838,6 +1976,23 @@ async def lifespan(_: FastAPI):
recovered_frames.append(f)
if subject_generation_interrupted:
update(job, frames=recovered_frames, message="服务重启 · 上次主体生成已中断,可重新生成")
video_generation_interrupted = False
recovered_videos = []
for video in job.generated_videos:
if video.status in {"queued", "in_progress"}:
recovered_videos.append(video.model_copy(update={
"status": "failed",
"progress": 100,
"error": "服务重启 · 上次视频生成已中断,请重新生成",
"queue_position": 0,
"queue_size": 0,
"queue_message": "",
}))
video_generation_interrupted = True
else:
recovered_videos.append(video)
if video_generation_interrupted:
update(job, generated_videos=recovered_videos, message="服务重启 · 上次视频生成已中断,请重新生成")
JOBS[p.name] = job
except Exception:
pass
@@ -7850,7 +8005,7 @@ def render_storyboard_video(
product_img = out_dir / f"product_reference_{i}.jpg"
prepare_video_reference(product_ref_path, product_img)
prepared_product_imgs.append(product_img)
update_generated_video(job_id, local_id, status="in_progress", progress=5)
update_generated_video(job_id, local_id, status="in_progress", progress=5, queue_message="准备素材…")
with httpx.Client(timeout=120) as client:
payload = {"model": model, "prompt": prompt, "size": size}
payload[VIDEO_DURATION_FIELD] = seconds
@@ -7880,7 +8035,14 @@ def render_storyboard_video(
status = normalize_video_status(data.get("status"))
progress = video_progress(data, 5)
direct_url = video_url_from_response(data)
update_generated_video(job_id, local_id, provider_id=video_api_id, status=status, progress=progress)
update_generated_video(
job_id,
local_id,
provider_id=video_api_id,
status=status,
progress=progress,
queue_message="生成中…" if status in {"queued", "in_progress"} else "",
)
deadline = time.time() + VIDEO_POLL_TIMEOUT_SECONDS
while status in {"queued", "in_progress"} and time.time() < deadline:
@@ -7891,10 +8053,16 @@ def render_storyboard_video(
status = normalize_video_status(pdata.get("status"))
progress = video_progress(pdata, progress)
direct_url = video_url_from_response(pdata) or direct_url
update_generated_video(job_id, local_id, status=status, progress=progress)
update_generated_video(
job_id,
local_id,
status=status,
progress=progress,
queue_message="生成中…" if status in {"queued", "in_progress"} else "",
)
if status != "completed":
update_generated_video(job_id, local_id, status="failed", error=f"video status: {status}", progress=progress)
update_generated_video(job_id, local_id, status="failed", error=f"video status: {status}", progress=progress, queue_message="")
return
download_generated_video(client, base, headers, video_api_id, direct_url, out_mp4)
@@ -7905,9 +8073,12 @@ def render_storyboard_video(
progress=100,
url=f"/jobs/{job_id}/storyboard-videos/{local_id}.mp4",
error="",
queue_position=0,
queue_size=0,
queue_message="",
)
except Exception as e:
update_generated_video(job_id, local_id, status="failed", error=str(e)[:500])
update_generated_video(job_id, local_id, status="failed", error=str(e)[:500], queue_message="")
@app.post("/jobs/{job_id}/frames/{idx}/storyboard/quick-plan", response_model=StoryboardScene)
@@ -8015,6 +8186,7 @@ def _enqueue_storyboard_videos(job: Job, frame: KeyFrame, req: GenerateStoryboar
source_ref = None
items: list[GeneratedVideo] = []
ids: list[str] = []
queued_tasks: list[tuple[str, tuple]] = []
for i in range(count):
local_id = uuid.uuid4().hex[:12]
ids.append(local_id)
@@ -8033,13 +8205,13 @@ def _enqueue_storyboard_videos(job: Job, frame: KeyFrame, req: GenerateStoryboar
duration=float(seconds),
progress=0,
created_at=time.time(),
queue_message="排队中…",
))
task_args = (job.id, local_id, "", ref_path, variant_prompt, model, seconds, video_size, source_ref, last_ref_path, reference_ref_paths, primary_role)
if bg is not None:
bg.add_task(render_storyboard_video, *task_args)
else:
threading.Thread(target=render_storyboard_video, args=task_args, daemon=True).start()
queued_tasks.append((local_id, task_args))
update(job, generated_videos=items + job.generated_videos, message=f"视频候选已提交 · 分镜 {frame.index + 1} · {count}")
for local_id, task_args in queued_tasks:
enqueue_video_task(job, local_id, task_args)
return ids
@@ -9482,6 +9654,7 @@ def delete_storyboard_video(job_id: str, video_id: str) -> Job:
kept = [v for v in job.generated_videos if v.id != video_id]
if len(kept) == before:
raise HTTPException(404, "generated video not found")
cancel_queued_video_task(job_id, video_id)
out_dir = job_dir(job_id) / "storyboard_videos" / video_id
if out_dir.exists():
try: