auto-save 2026-05-13 10:55 (~4)
This commit is contained in:
95
api/main.py
95
api/main.py
@@ -79,7 +79,8 @@ class KeyFrame(BaseModel):
|
||||
timestamp: float
|
||||
url: str
|
||||
description: dict | None = None # vision 模型识别结果 {scene, objects, style, suggested_prompt}
|
||||
cleaned_url: str | None = None # 清洗后干净版 → /jobs/{id}/frames/{idx}/cleaned.jpg
|
||||
cleaned_url: str | None = None # 清洗后干净版(待应用)→ /jobs/{id}/frames/{idx}/cleaned.jpg
|
||||
cleaned_applied: bool = False # 是否已用清洗版替换原图(替换后 cleaned_url=null)
|
||||
elements: list[KeyElement] = [] # 提取的元素清单(持久化)
|
||||
generated_images: list[GeneratedImage] = []
|
||||
|
||||
@@ -978,10 +979,36 @@ def describe_frame(job_id: str, idx: int) -> Job:
|
||||
|
||||
# ---------- 清洗水印 / 元素提取(关键帧二阶段加工) ----------
|
||||
|
||||
class CleanupReq(BaseModel):
|
||||
# 可选 region:相对坐标 0-1,限制清洗范围
|
||||
region: dict | None = None # {"x": float, "y": float, "w": float, "h": float}
|
||||
|
||||
|
||||
def _region_to_phrase(r: dict) -> str:
|
||||
"""把相对坐标矩形转成方位描述给 prompt 用"""
|
||||
x = max(0.0, min(1.0, float(r.get("x", 0))))
|
||||
y = max(0.0, min(1.0, float(r.get("y", 0))))
|
||||
w = max(0.0, min(1.0 - x, float(r.get("w", 0))))
|
||||
h = max(0.0, min(1.0 - y, float(r.get("h", 0))))
|
||||
if w <= 0 or h <= 0:
|
||||
return ""
|
||||
cx, cy = x + w / 2, y + h / 2
|
||||
hpos = "left" if cx < 0.4 else "right" if cx > 0.6 else "center"
|
||||
vpos = "top" if cy < 0.4 else "bottom" if cy > 0.6 else "middle"
|
||||
quadrant = f"{vpos}-{hpos}" if hpos != "center" else vpos
|
||||
x_pct = (int(x * 100), int((x + w) * 100))
|
||||
y_pct = (int(y * 100), int((y + h) * 100))
|
||||
return (
|
||||
f"the {quadrant} area of the image "
|
||||
f"(roughly horizontal {x_pct[0]}%-{x_pct[1]}%, vertical {y_pct[0]}%-{y_pct[1]}%)"
|
||||
)
|
||||
|
||||
|
||||
@app.post("/jobs/{job_id}/frames/{idx}/cleanup", response_model=Job)
|
||||
def cleanup_frame(job_id: str, idx: int) -> Job:
|
||||
def cleanup_frame(job_id: str, idx: int, req: CleanupReq | None = None) -> Job:
|
||||
"""调 nano-banana image edit 清洗关键帧:去水印 / @用户名 / 字幕 / 平台 logo。
|
||||
输出干净版到 jobs/<id>/cleaned/<idx>.jpg,写回 frame.cleaned_url。"""
|
||||
输出干净版到 jobs/<id>/cleaned/<idx>.jpg,写回 frame.cleaned_url。
|
||||
可选 region: 限定只清洗框内区域。"""
|
||||
import time as _time
|
||||
job = JOBS.get(job_id)
|
||||
if not job:
|
||||
@@ -993,14 +1020,17 @@ def cleanup_frame(job_id: str, idx: int) -> Job:
|
||||
if not frame_path.exists():
|
||||
raise HTTPException(404, "frame file missing")
|
||||
|
||||
prompt = (
|
||||
"Clean this image by removing all overlay graphics that obstruct the main content: "
|
||||
"watermarks, social media usernames or @handles, platform logos (TikTok, Instagram, etc.), "
|
||||
"subtitles, captions, overlay text, sticker text, hashtags. "
|
||||
"Keep all original scene elements (characters, props, background, lighting) intact. "
|
||||
"The result should look like the same photograph with overlay UI removed — "
|
||||
"natural, seamless, no visible patches or artifacts."
|
||||
)
|
||||
region_phrase = _region_to_phrase(req.region) if (req and req.region) else ""
|
||||
if region_phrase:
|
||||
prompt = (
|
||||
f"Remove text overlays only within {region_phrase}: watermarks, usernames, captions, hashtags, "
|
||||
"platform logos. Keep every other part of the image exactly unchanged."
|
||||
)
|
||||
else:
|
||||
prompt = (
|
||||
"Remove all text overlays from this image: watermarks, usernames, captions, hashtags, "
|
||||
"platform logos. Keep the rest of the scene intact and natural."
|
||||
)
|
||||
try:
|
||||
img_bytes, _mode = _image_edit_call(frame_path, prompt, fallback_text=False, max_attempts=3)
|
||||
except RuntimeError as e:
|
||||
@@ -1015,6 +1045,7 @@ def cleanup_frame(job_id: str, idx: int) -> Job:
|
||||
for f in job.frames:
|
||||
if f.index == idx:
|
||||
f.cleaned_url = f"/jobs/{job_id}/frames/{idx}/cleaned.jpg?t={int(_time.time())}"
|
||||
f.cleaned_applied = False # 重新清洗:重置"已应用"状态
|
||||
new_frames.append(f)
|
||||
update(job, frames=new_frames, message=f"清洗完成 · 分镜 {idx + 1}")
|
||||
return job
|
||||
@@ -1028,6 +1059,48 @@ def get_cleaned_frame(job_id: str, idx: int):
|
||||
return FileResponse(p, media_type="image/jpeg")
|
||||
|
||||
|
||||
@app.post("/jobs/{job_id}/frames/{idx}/cleanup/apply", response_model=Job)
|
||||
def apply_cleaned(job_id: str, idx: int) -> Job:
|
||||
"""用清洗版替换原关键帧:物理覆盖 frames/{idx}.jpg ← cleaned/{idx}.jpg。
|
||||
原图作备份 → orig/{idx}.jpg(首次替换时备份,后续替换跳过)。
|
||||
替换后 frame.cleaned_url 清空(不再有"待应用"清洗版)"""
|
||||
import shutil as _shutil
|
||||
job = JOBS.get(job_id)
|
||||
if not job:
|
||||
raise HTTPException(404, "job not found")
|
||||
frame = next((f for f in job.frames if f.index == idx), None)
|
||||
if not frame:
|
||||
raise HTTPException(404, "frame not found")
|
||||
cleaned_path = job_dir(job_id) / "cleaned" / f"{idx:03d}.jpg"
|
||||
if not cleaned_path.exists():
|
||||
raise HTTPException(404, "no cleaned version to apply")
|
||||
frame_path = job_dir(job_id) / "frames" / f"{idx:03d}.jpg"
|
||||
|
||||
# 首次替换:把原图备份到 orig/{idx}.jpg
|
||||
orig_dir = job_dir(job_id) / "orig"
|
||||
orig_dir.mkdir(parents=True, exist_ok=True)
|
||||
orig_backup = orig_dir / f"{idx:03d}.jpg"
|
||||
if not orig_backup.exists() and frame_path.exists():
|
||||
_shutil.copy2(frame_path, orig_backup)
|
||||
|
||||
# 用 cleaned 覆盖 frames/
|
||||
_shutil.copy2(cleaned_path, frame_path)
|
||||
# 删 cleaned 文件(已经"应用",不再是单独的待选版本)
|
||||
try:
|
||||
cleaned_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
new_frames = []
|
||||
for f in job.frames:
|
||||
if f.index == idx:
|
||||
f.cleaned_url = None
|
||||
f.cleaned_applied = True
|
||||
new_frames.append(f)
|
||||
update(job, frames=new_frames, message=f"已替换分镜 {idx + 1} 为清洗版")
|
||||
return job
|
||||
|
||||
|
||||
class AddElementReq(BaseModel):
|
||||
name_zh: str
|
||||
name_en: str = ""
|
||||
|
||||
Reference in New Issue
Block a user