auto-save 2026-05-21 15:09 (+1, ~3)

This commit is contained in:
2026-05-21 15:09:40 +08:00
parent 68ecc8b97b
commit b82dad4aa8
4 changed files with 2981 additions and 2286 deletions

View File

@@ -29,6 +29,8 @@ load_dotenv()
JOBS_DIR = Path(os.getenv("JOBS_DIR", "./jobs")).resolve()
JOBS_DIR.mkdir(parents=True, exist_ok=True)
AGENT_RUNS_DIR = Path(os.getenv("AGENT_RUNS_DIR", JOBS_DIR.parent / "agent_runs")).resolve()
AGENT_RUNS_DIR.mkdir(parents=True, exist_ok=True)
CORS_ORIGINS = [o.strip() for o in os.getenv("CORS_ORIGINS", "http://localhost:4290,http://127.0.0.1:4290").split(",") if o.strip()]
PRODUCT_LIBRARY_DIR = Path(
os.getenv("PRODUCT_LIBRARY_DIR", Path(__file__).resolve().parent / "product_library" / "skg-products")
@@ -8011,6 +8013,418 @@ def copy_character_library_assets(job_id: str, req: CopyCharacterLibraryAssetReq
}
class AgentRunLog(BaseModel):
ts: float
level: Literal["info", "warn", "error"] = "info"
message: str
class AgentRun(BaseModel):
id: str
job_id: str
status: Literal["draft", "queued", "executing", "reviewing", "completed", "failed"] = "queued"
stage: str = "queued"
progress: int = 0
logs: list[AgentRunLog] = Field(default_factory=list)
video_ids: list[str] = Field(default_factory=list)
final_video_url: str = ""
contact_sheet_url: str = ""
error: str = ""
created_at: float = Field(default_factory=time.time)
updated_at: float = Field(default_factory=time.time)
AGENT_RUNS: dict[str, AgentRun] = {}
AGENT_DEFAULT_PRODUCT_IDS = [
"desktop-skg-product-angle-01",
"desktop-skg-product-angle-02",
"desktop-skg-product-angle-03",
"desktop-skg-product-angle-04",
]
AGENT_DEFAULT_CHARACTER_ID = os.getenv("AGENT_DEFAULT_CHARACTER_ID", "character-02").strip() or "character-02"
AGENT_SHOT_COUNT = max(8, min(12, int(os.getenv("AGENT_SHOT_COUNT", "12"))))
AGENT_SHOT_DURATION_SECONDS = max(4.0, min(8.0, float(os.getenv("AGENT_SHOT_DURATION_SECONDS", "5"))))
AGENT_VIDEO_TIMEOUT_SECONDS = max(300, int(os.getenv("AGENT_VIDEO_TIMEOUT_SECONDS", "1500")))
def agent_run_dir(run_id: str) -> Path:
return AGENT_RUNS_DIR / run_id
def agent_run_path(run_id: str) -> Path:
return agent_run_dir(run_id) / "state.json"
def save_agent_run(run: AgentRun) -> None:
run.updated_at = time.time()
d = agent_run_dir(run.id)
d.mkdir(parents=True, exist_ok=True)
agent_run_path(run.id).write_text(run.model_dump_json(indent=2), encoding="utf-8")
AGENT_RUNS[run.id] = run
def agent_log(
run: AgentRun,
message: str,
*,
stage: str | None = None,
progress: int | None = None,
status: Literal["draft", "queued", "executing", "reviewing", "completed", "failed"] | None = None,
level: Literal["info", "warn", "error"] = "info",
) -> None:
if stage is not None:
run.stage = stage
if progress is not None:
run.progress = max(0, min(100, int(progress)))
if status is not None:
run.status = status
run.logs = (run.logs + [AgentRunLog(ts=time.time(), level=level, message=message)])[-240:]
save_agent_run(run)
async def save_agent_product_upload(job_id: str, upload: UploadFile, index: int) -> dict:
if not upload.filename:
raise HTTPException(400, "product image filename required")
content_type = (upload.content_type or "").lower()
suffix = Path(upload.filename).suffix.lower()
if content_type and not content_type.startswith("image/"):
raise HTTPException(400, f"product image must be image/*, got {content_type}")
if not content_type and suffix not in {".jpg", ".jpeg", ".png", ".webp", ".bmp"}:
raise HTTPException(400, f"unsupported product image: {suffix}")
out_dir = job_dir(job_id) / "assets"
out_dir.mkdir(parents=True, exist_ok=True)
asset_id = uuid.uuid4().hex[:12]
tmp = out_dir / f"{asset_id}.upload"
out = out_dir / f"{asset_id}.jpg"
try:
await _save_upload_to_path(upload, tmp)
meta = normalize_product_asset_image(tmp, out)
except Exception as e:
try:
out.unlink()
except OSError:
pass
raise HTTPException(400, f"product upload failed: {e}")
finally:
try:
tmp.unlink()
except OSError:
pass
return {
"kind": "asset",
"frame_idx": -1,
"element_id": asset_id,
"cutout_id": asset_id,
"label": f"用户产品图 {index} · {upload.filename}",
"asset_meta": meta,
}
def agent_fallback_product_refs(job_id: str) -> list[dict]:
refs: list[dict] = []
for product_id in AGENT_DEFAULT_PRODUCT_IDS:
try:
refs.append(copy_product_library_asset(job_id, CopyProductLibraryAssetReq(product_id=product_id)))
except Exception:
continue
return refs
def agent_subject_refs(job_id: str) -> list[dict]:
try:
payload = copy_character_library_assets(job_id, CopyCharacterLibraryAssetReq(character_id=AGENT_DEFAULT_CHARACTER_ID))
except Exception:
return []
images = payload.get("images") or []
preferred = []
for ref in images:
label = str(ref.get("label") or "")
if any(key in label for key in ("正面", "左45", "半身近景", "侧面")):
preferred.append(ref)
return (preferred or images)[:4]
def agent_base_prompt() -> str:
return (
"Vertical 9:16 original SKG short-form ad. Do not copy the real person from the source video. "
"Use the provided transparent anatomy subject as the recurring character when a person is needed. "
"Use the provided SKG white U-shaped neck-and-shoulder massager product references as rigid product truth: "
"one clean U-shaped wearable device, silver contact pads, red heat/light accents, premium white shell, correct scale around the neck and shoulders. "
"No captions, no platform UI, no watermark, no medical treatment claims. Natural creator-demo pacing, clean premium lighting."
)
def agent_shot_plan() -> list[dict]:
base = agent_base_prompt()
shots = [
("hook", "Hook close-up: transparent anatomy character faces camera and raises the SKG neck-and-shoulder massager into the foreground, fast creator-ad opening energy, clean blue-white studio background."),
("pain", "Pain-point scene: the character sits at a desk after long screen work, shoulders tense, then notices the SKG massager beside the laptop; show neck and shoulder area clearly."),
("product_macro", "Macro product detail: slow moving close-up across the SKG U-shaped device, buttons, inner massage nodes, silver pads, premium white plastic and red heat accents."),
("wear", "Wear demo: the character places the SKG U-shaped massager externally around the back of the neck and upper shoulders, hands guiding both arms into position."),
("contact", "Heat/contact moment: close-up of silver massage pads aligned with side neck and upper trapezius, subtle red warmth glow, product outside the transparent body, no clipping."),
("office_use", "Office use beat: the character works calmly at a desk while wearing the SKG massager, small relief gesture, device stable and visible around neck and shoulders."),
("living_room", "Comfort beat: relaxed home setting, character leans back slightly, SKG device running, premium wellness mood, smooth gentle camera drift."),
("angle_proof", "Product angle proof: clean tabletop shot with the SKG U-shaped massager rotating or being lifted by hand, show thickness, contact pads, seams, and control button."),
("mobility", "Daily mobility scene: character walks from desk to sofa wearing the SKG massager, lightweight lifestyle demonstration, product silhouette remains accurate."),
("benefit", "Benefit visualization: transparent anatomy view emphasizes neck and shoulder contact zones with tasteful red warmth accents while the device stays opaque and external."),
("packaging", "Brand proof shot: SKG product and packaging on a clean surface, hand picks up the device, premium white product photography look, no extra text overlays."),
("cta", "Ending CTA: character faces camera wearing the SKG massager, then the final frame lands on a clean product hero angle with confident premium ad finish."),
]
return [{"key": key, "prompt": f"{base}\n\nShot direction: {text}"} for key, text in shots[:AGENT_SHOT_COUNT]]
def agent_reference_for_shot(shot_key: str, product_refs: list[dict], subject_refs: list[dict]) -> tuple[dict | None, str]:
product_first = {"product_macro", "angle_proof", "packaging"}
if shot_key in product_first and product_refs:
return product_refs[min(2, len(product_refs) - 1)], "reference_image"
if subject_refs:
if shot_key in {"contact", "benefit"} and len(subject_refs) > 1:
return subject_refs[min(1, len(subject_refs) - 1)], "reference_image"
return subject_refs[0], "reference_image"
if product_refs:
return product_refs[0], "reference_image"
return None, "reference_image"
def agent_get_video(job_id: str, video_id: str) -> GeneratedVideo | None:
job = JOBS.get(job_id)
if not job:
return None
return next((item for item in job.generated_videos if item.id == video_id), None)
def agent_wait_videos(run: AgentRun, ids: list[str], *, target_completed: int) -> list[str]:
deadline = time.time() + AGENT_VIDEO_TIMEOUT_SECONDS
last_summary = ""
while time.time() < deadline:
completed: list[str] = []
active = 0
failed = 0
for video_id in ids:
item = agent_get_video(run.job_id, video_id)
if not item:
active += 1
continue
if item.status == "completed" and item.url:
completed.append(video_id)
elif item.status == "failed":
failed += 1
else:
active += 1
summary = f"视频生成中 · 完成 {len(completed)}/{target_completed} · 运行 {active} · 失败 {failed}"
if summary != last_summary:
agent_log(run, summary, stage="execute", progress=58 + min(24, len(completed) * 2))
last_summary = summary
if len(completed) >= target_completed or active == 0:
return completed
time.sleep(6)
return [video_id for video_id in ids if (agent_get_video(run.job_id, video_id) and agent_get_video(run.job_id, video_id).status == "completed")]
def agent_submit_shot(
run: AgentRun,
frame: KeyFrame,
shot: dict,
product_refs: list[dict],
subject_refs: list[dict],
retry: int = 0,
) -> str:
first_ref, primary_role = agent_reference_for_shot(str(shot["key"]), product_refs, subject_refs)
if not first_ref:
raise RuntimeError("no reference image available for video generation")
job = JOBS[run.job_id]
prompt = str(shot["prompt"])
if retry:
prompt += f"\n\nRetry pass {retry}: keep the same idea but simplify motion, keep the product shape stable, avoid strange anatomy or deformed product."
req = GenerateStoryboardVideoReq(
prompt=prompt,
duration=AGENT_SHOT_DURATION_SECONDS,
count=1,
storyboard_row_idx=len(run.video_ids),
first_image=first_ref,
product_images=product_refs[:6],
subject_images=subject_refs[:4],
model="seedance",
size="720x1280",
)
# _enqueue_storyboard_videos derives the primary role from first_image. Keep the
# local variable above for future provider-specific tuning without changing API.
_ = primary_role
ids = _enqueue_storyboard_videos(job, frame, req, None)
return ids[0]
def agent_compose_final(run: AgentRun, ordered_ids: list[str]) -> None:
d = agent_run_dir(run.id)
d.mkdir(parents=True, exist_ok=True)
final_dir = job_dir(run.job_id) / "final"
final_dir.mkdir(parents=True, exist_ok=True)
final = final_dir / f"agent-{run.id}.mp4"
concat_file = d / "concat.txt"
paths: list[Path] = []
for video_id in ordered_ids:
p = job_dir(run.job_id) / "storyboard_videos" / video_id / "video.mp4"
if p.exists() and p.stat().st_size > 0:
paths.append(p.resolve())
if not paths:
raise RuntimeError("no completed video files to compose")
concat_file.write_text("".join(f"file '{str(p).replace(chr(39), chr(39) + chr(92) + chr(39) + chr(39))}'\n" for p in paths), encoding="utf-8")
try:
run_cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file), "-c", "copy", "-movflags", "+faststart", str(final)]
run(run_cmd)
except Exception:
run_cmd = [
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file),
"-vf", "scale=720:1280,setsar=1", "-r", "24", "-c:v", "mpeg4", "-q:v", "4",
"-c:a", "aac", "-b:a", "160k", "-movflags", "+faststart", str(final),
]
run(run_cmd)
contact = d / "contact.jpg"
try:
run([
"ffmpeg", "-y", "-i", str(final),
"-vf", "select='not(mod(n,120))',scale=180:320,tile=12x1",
"-frames:v", "1", str(contact),
])
run.contact_sheet_url = f"/agent-runs/{run.id}/contact.jpg"
except Exception as e:
agent_log(run, f"抽帧审片图生成失败:{str(e)[:180]}", level="warn")
run.final_video_url = f"/agent-runs/{run.id}/final.mp4"
save_agent_run(run)
def agent_run_worker(run_id: str, product_refs: list[dict]) -> None:
run = AGENT_RUNS[run_id]
try:
agent_log(run, "接管任务:创建 1 分钟二创出片流程", status="executing", stage="download", progress=4)
pipeline_download(run.job_id)
job = JOBS[run.job_id]
if job.status == "failed":
raise RuntimeError(job.error or job.message or "source video download failed")
agent_log(run, f"源视频就绪 · {job.duration:.1f}s · {job.width}x{job.height}", stage="download", progress=14)
refs = product_refs[:6] or agent_fallback_product_refs(run.job_id)
if not refs:
raise RuntimeError("需要至少 1 张产品图")
update(job, product_refs=refs, message=f"Agent 已接入产品图 · {len(refs)}")
agent_log(run, f"产品素材就绪 · {len(refs)}", stage="assets", progress=20)
subject_refs = agent_subject_refs(run.job_id)
if subject_refs:
agent_log(run, f"主体参考就绪 · {len(subject_refs)} 张透明骨架角色", stage="assets", progress=24)
else:
agent_log(run, "未找到主体角色库,改用产品图和文本约束生成", stage="assets", progress=24, level="warn")
agent_log(run, "抽取源视频节奏帧 · 12 张", stage="analyze", progress=28)
pipeline_analyze(run.job_id, frame_count=12, target="transparent_human", mode="replace", quality="auto")
job = JOBS[run.job_id]
if not job.frames:
raise RuntimeError(job.error or "keyframe extraction failed")
agent_log(run, f"节奏帧完成 · {len(job.frames)}", stage="plan", progress=40)
shots = agent_shot_plan()
agent_log(run, f"生成二创镜头计划 · {len(shots)}× {AGENT_SHOT_DURATION_SECONDS:g}s", stage="plan", progress=46)
submitted: list[str] = []
for idx, shot in enumerate(shots):
frame = job.frames[idx % len(job.frames)]
video_id = agent_submit_shot(run, frame, shot, refs, subject_refs)
submitted.append(video_id)
run.video_ids = submitted
save_agent_run(run)
agent_log(run, f"提交镜头 {idx + 1:02d}/{len(shots)} · {shot['key']} · {video_id}", stage="execute", progress=48 + idx)
completed = agent_wait_videos(run, submitted, target_completed=len(shots))
failed_positions = [i for i, video_id in enumerate(submitted) if video_id not in completed]
if failed_positions:
agent_log(run, f"{len(failed_positions)} 段未完成,自动重跑一次", stage="execute", progress=82, level="warn")
for pos in failed_positions:
frame = job.frames[pos % len(job.frames)]
retry_id = agent_submit_shot(run, frame, shots[pos], refs, subject_refs, retry=1)
submitted[pos] = retry_id
run.video_ids = submitted
save_agent_run(run)
agent_log(run, f"重跑镜头 {pos + 1:02d} · {retry_id}", stage="execute", progress=83)
completed = agent_wait_videos(run, submitted, target_completed=len(shots))
ordered_completed = [video_id for video_id in submitted if video_id in completed]
if len(ordered_completed) < max(8, len(shots) - 2):
raise RuntimeError(f"可用镜头不足:{len(ordered_completed)}/{len(shots)}")
agent_log(run, f"自动审片通过 · 可用 {len(ordered_completed)}/{len(shots)}", status="reviewing", stage="review", progress=88)
agent_log(run, "合成最终成片", stage="compose", progress=92)
agent_compose_final(run, ordered_completed)
agent_log(run, f"成片完成 · {len(ordered_completed)}", status="completed", stage="final", progress=100)
except Exception as e:
run.error = str(e)[:600]
agent_log(run, f"任务失败:{run.error}", status="failed", stage="failed", progress=100, level="error")
@app.post("/agent-runs", response_model=AgentRun)
async def create_agent_run(
tk_url: str = Form(...),
product_files: list[UploadFile] | None = File(None),
) -> AgentRun:
if not tk_url.strip():
raise HTTPException(400, "tk_url required")
job_id = uuid.uuid4().hex[:12]
run_id = uuid.uuid4().hex[:12]
job = Job(id=job_id, url=tk_url.strip())
JOBS[job_id] = job
save_state(job)
refs: list[dict] = []
for index, upload in enumerate((product_files or [])[:6], start=1):
refs.append(await save_agent_product_upload(job_id, upload, index))
run = AgentRun(id=run_id, job_id=job_id, status="queued", stage="queued", progress=1)
save_agent_run(run)
agent_log(run, f"任务已入队 · job={job_id} · 产品图 {len(refs)}", status="queued", stage="queued", progress=1)
threading.Thread(target=agent_run_worker, args=(run_id, refs), daemon=True).start()
return run
@app.get("/agent-runs", response_model=list[AgentRun])
def list_agent_runs(limit: int = 20) -> list[AgentRun]:
for p in AGENT_RUNS_DIR.iterdir():
if p.is_dir() and (p / "state.json").exists() and p.name not in AGENT_RUNS:
try:
AGENT_RUNS[p.name] = AgentRun.model_validate_json((p / "state.json").read_text(encoding="utf-8"))
except Exception:
pass
items = list(AGENT_RUNS.values())
items.sort(key=lambda item: item.updated_at, reverse=True)
return items[:max(1, min(100, limit))]
@app.get("/agent-runs/{run_id}", response_model=AgentRun)
def get_agent_run(run_id: str) -> AgentRun:
run = AGENT_RUNS.get(run_id)
if not run and agent_run_path(run_id).exists():
run = AgentRun.model_validate_json(agent_run_path(run_id).read_text(encoding="utf-8"))
AGENT_RUNS[run_id] = run
if not run:
raise HTTPException(404, "agent run not found")
return run
@app.get("/agent-runs/{run_id}/final.mp4")
def get_agent_run_final(run_id: str):
run = get_agent_run(run_id)
p = job_dir(run.job_id) / "final" / f"agent-{run.id}.mp4"
if not p.exists():
raise HTTPException(404, "final video not found")
return FileResponse(p, media_type="video/mp4")
@app.get("/agent-runs/{run_id}/contact.jpg")
def get_agent_run_contact(run_id: str):
p = agent_run_dir(run_id) / "contact.jpg"
if not p.exists():
raise HTTPException(404, "contact sheet not found")
return FileResponse(p, media_type="image/jpeg")
def product_image_alpha(img: Image.Image) -> Image.Image:
rgba = img.convert("RGBA")
rgb = rgba.convert("RGB")